[jira] [Commented] (AIRFLOW-3080) Mysql OperationalError occurs during heartbeat or any DB operation
[ https://issues.apache.org/jira/browse/AIRFLOW-3080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618514#comment-16618514 ] Amit Ghosh commented on AIRFLOW-3080: - Am working on the fix testing it now will commit it after testing it extensively. > Mysql OperationalError occurs during heartbeat or any DB operation > -- > > Key: AIRFLOW-3080 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3080 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler, worker >Affects Versions: 1.10.0 >Reporter: Amit Ghosh >Assignee: Amit Ghosh >Priority: Major > Original Estimate: 96h > Remaining Estimate: 96h > > When airflow uses mysql and airflow has many worker instances and no dag was > executed for a long time mysql gives "mysql_exceptions.OperationalError". > Main issue is if connections become stale for a long time, first db request > gives this error because mysql marks connection as stale after some time if > no connection has happened to db from a given sqlachemy pool. I am working on > a fix and will commit it and that should work in case of other databases also. > 1) Log Text = \{"log":"[2018-09-18 05:33:45,296] {jobs.py:748} ERROR - > (_mysql_exceptions.OperationalError) (2005, \"Unknown MySQL server host > 'mlp.prod.machine-learning-platform-prod.ms-df-cloudrdbms.prod.walmart.com' > (2)\") (Background on this error at: [http://sqlalche.me/e/e3q8]) > ","stream":"stdout","time":"2018-09-18T05:33:45.315547946Z"} > > 2) Log Text = {"log":" raise errorvalue > ","stream":"stderr","time":"2018-09-15T06:04:35.722310847Z"} > {"log":"sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) > (2013, 'Lost connection to MySQL server during query') [SQL: u'UPDATE job SET > latest_heartbeat=%s WHERE job.id = %s'] [parameters: (datetime.datetime(2018, > 9, 15, 6, 4, 23, 4294), 345143L)] (Background on this error at: > [http://sqlalche.me/e/e3q8]) > ","stream":"stderr","time":"2018-09-15T06:04:35.72232954Z"} > {"log":"[2018-09-15 06:04:35,844: ERROR/ForkPoolWorker-13] Command 'airflow > run dag_2063_baf60054-d0c7-41b2-8009-4d88f773dc79 web_crawl_pipeline > 2018-09-14T05:48:42 --local -sd > DAGS_FOLDER/1833_workflows/dag_2063_baf60054-d0c7-41b2-8009-4d88f773dc79.py ' > returned non-zero exit status 1 > ","stream":"stderr","time":"2018-09-15T06:04:35.847747612Z"} > {"log":"[2018-09-15 06:04:35,851: ERROR/ForkPoolWorker-13] Task > airflow.executors.celery_executor.execute_command[30141a5a-71da-4d28-a829-495aeca3cfa9] > raised unexpected: AirflowException('Celery command failed',) > ","stream":"stderr","time":"2018-09-15T06:04:35.855019453Z"} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-3080) Mysql OperationalError occurs during heartbeat or any DB operation
Amit Ghosh created AIRFLOW-3080: --- Summary: Mysql OperationalError occurs during heartbeat or any DB operation Key: AIRFLOW-3080 URL: https://issues.apache.org/jira/browse/AIRFLOW-3080 Project: Apache Airflow Issue Type: Bug Components: scheduler, worker Affects Versions: 1.10.0 Reporter: Amit Ghosh Assignee: Amit Ghosh When airflow uses mysql and airflow has many worker instances and no dag was executed for a long time mysql gives "mysql_exceptions.OperationalError". Main issue is if connections become stale for a long time, first db request gives this error because mysql marks connection as stale after some time if no connection has happened to db from a given sqlachemy pool. I am working on a fix and will commit it and that should work in case of other databases also. 1) Log Text = \{"log":"[2018-09-18 05:33:45,296] {jobs.py:748} ERROR - (_mysql_exceptions.OperationalError) (2005, \"Unknown MySQL server host 'mlp.prod.machine-learning-platform-prod.ms-df-cloudrdbms.prod.walmart.com' (2)\") (Background on this error at: [http://sqlalche.me/e/e3q8]) ","stream":"stdout","time":"2018-09-18T05:33:45.315547946Z"} 2) Log Text = {"log":" raise errorvalue ","stream":"stderr","time":"2018-09-15T06:04:35.722310847Z"} {"log":"sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (2013, 'Lost connection to MySQL server during query') [SQL: u'UPDATE job SET latest_heartbeat=%s WHERE job.id = %s'] [parameters: (datetime.datetime(2018, 9, 15, 6, 4, 23, 4294), 345143L)] (Background on this error at: [http://sqlalche.me/e/e3q8]) ","stream":"stderr","time":"2018-09-15T06:04:35.72232954Z"} {"log":"[2018-09-15 06:04:35,844: ERROR/ForkPoolWorker-13] Command 'airflow run dag_2063_baf60054-d0c7-41b2-8009-4d88f773dc79 web_crawl_pipeline 2018-09-14T05:48:42 --local -sd DAGS_FOLDER/1833_workflows/dag_2063_baf60054-d0c7-41b2-8009-4d88f773dc79.py ' returned non-zero exit status 1 ","stream":"stderr","time":"2018-09-15T06:04:35.847747612Z"} {"log":"[2018-09-15 06:04:35,851: ERROR/ForkPoolWorker-13] Task airflow.executors.celery_executor.execute_command[30141a5a-71da-4d28-a829-495aeca3cfa9] raised unexpected: AirflowException('Celery command failed',) ","stream":"stderr","time":"2018-09-15T06:04:35.855019453Z"} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] seelmann commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
seelmann commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218305157 ## File path: airflow/models.py ## @@ -56,8 +56,8 @@ from sqlalchemy import ( Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType, -Index, Float, LargeBinary, UniqueConstraint) -from sqlalchemy import func, or_, and_, true as sqltrue +Index, Float, LargeBinary, UniqueConstraint, ForeignKeyConstraint) Review comment: Hm, the line is already not sorted (Index, Float) ;). Should I then sort the whole 3-line import statement? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seelmann commented on issue #3913: [AIRFLOW-3072] Assign permission get_logs_with_metadata to viewer role
seelmann commented on issue #3913: [AIRFLOW-3072] Assign permission get_logs_with_metadata to viewer role URL: https://github.com/apache/incubator-airflow/pull/3913#issuecomment-422259970 Thanks @feng-tao for approval. This is more a fix for 1.10 which does not include DAG level permission yet. Maybe this can be included in 1.10.1 @ashb ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors
[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618498#comment-16618498 ] Stefan Seelmann commented on AIRFLOW-2747: -- I assume this "radhefa Roufique hossain" is a spam user. Can one with admin access please delete the attached google_apis-23_r01.zip? Also reported in https://issues.apache.org/jira/browse/INFRA-17031 > Explicit re-schedule of sensors > --- > > Key: AIRFLOW-2747 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 > Project: Apache Airflow > Issue Type: Improvement > Components: core, operators >Affects Versions: 1.9.0 >Reporter: Stefan Seelmann >Assignee: Roufique hossain >Priority: Major > Fix For: 2.0.0 > > Attachments: Screenshot_2018-07-12_14-10-24.png, > Screenshot_2018-09-16_20-09-28.png, Screenshot_2018-09-16_20-19-23.png, > google_apis-23_r01.zip > > > By default sensors block a worker and just sleep between pokes. This is very > inefficient, especially when there are many long-running sensors. > There is a hacky workaroud by setting a small timeout value and a high retry > number. But that has drawbacks: > * Errors raised by sensors are hidden and the sensor retries too often > * The sensor is retried in a fixed time interval (with optional exponential > backoff) > * There are many attempts and many log files are generated > I'd like to propose an explicit reschedule mechanism: > * A new "reschedule" flag for sensors, if set to True it will raise an > AirflowRescheduleException that causes a reschedule. > * AirflowRescheduleException contains the (earliest) re-schedule date. > * Reschedule requests are recorded in new `task_reschedule` table and > visualized in the Gantt view. > * A new TI dependency that checks if a sensor task is ready to be > re-scheduled. > Advantages: > * This change is backward compatible. Existing sensors behave like before. > But it's possible to set the "reschedule" flag. > * The poke_interval, timeout, and soft_fail parameters are still respected > and used to calculate the next schedule time. > * Custom sensor implementations can even define the next sensible schedule > date by raising AirflowRescheduleException themselves. > * Existing TimeSensor and TimeDeltaSensor can also be changed to be > rescheduled when the time is reached. > * This mechanism can also be used by non-sensor operators (but then the new > ReadyToRescheduleDep has to be added to deps or BaseOperator). > Design decisions and caveats: > * When handling AirflowRescheduleException the `try_number` is decremented. > That means that subsequent runs use the same try number and write to the same > log file. > * Sensor TI dependency check now depends on `task_reschedule` table. However > only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. > Open questions and TODOs: > * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting > the state back to `NONE`? This would require more changes in scheduler code > and especially in the UI, but the state of a task would be more explicit and > more transparent to the user. > * Add example/test for a non-sensor operator > * Document the new feature -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-3073) A note is needed in 'Data Profiling' doc page to reminder users it's no longer supported in new webserver UI
[ https://issues.apache.org/jira/browse/AIRFLOW-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618440#comment-16618440 ] ASF GitHub Bot commented on AIRFLOW-3073: - feng-tao closed pull request #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver URL: https://github.com/apache/incubator-airflow/pull/3909 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/profiling.rst b/docs/profiling.rst index 09102334b1..37608127ae 100644 --- a/docs/profiling.rst +++ b/docs/profiling.rst @@ -1,6 +1,12 @@ +.. TODO: This section would be removed after we migrate to www_rbac completely. + Data Profiling == +.. note:: + ``Adhoc Queries`` and ``Charts`` are no longer supported in the new FAB-based webserver + and UI, due to security concerns. + Part of being productive with data is having the right weapons to profile the data you are working with. Airflow provides a simple query interface to write SQL and get results quickly, and a charting application This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > A note is needed in 'Data Profiling' doc page to reminder users it's no > longer supported in new webserver UI > > > Key: AIRFLOW-3073 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3073 > Project: Apache Airflow > Issue Type: Improvement > Components: Documentation >Reporter: Xiaodong DENG >Assignee: Xiaodong DENG >Priority: Critical > > In [https://airflow.incubator.apache.org/profiling.html,] it's not mentioned > at all that these features are no longer supported in new webser (FAB-based) > due to security concern > (https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#breaking-changes). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] feng-tao closed pull request #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver
feng-tao closed pull request #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver URL: https://github.com/apache/incubator-airflow/pull/3909 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/profiling.rst b/docs/profiling.rst index 09102334b1..37608127ae 100644 --- a/docs/profiling.rst +++ b/docs/profiling.rst @@ -1,6 +1,12 @@ +.. TODO: This section would be removed after we migrate to www_rbac completely. + Data Profiling == +.. note:: + ``Adhoc Queries`` and ``Charts`` are no longer supported in the new FAB-based webserver + and UI, due to security concerns. + Part of being productive with data is having the right weapons to profile the data you are working with. Airflow provides a simple query interface to write SQL and get results quickly, and a charting application This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on issue #3903: [AIRFLOW-3067] Display www_rbac Flask flash msg properly
XD-DENG commented on issue #3903: [AIRFLOW-3067] Display www_rbac Flask flash msg properly URL: https://github.com/apache/incubator-airflow/pull/3903#issuecomment-422235349 Hi @feng-tao , have updated the commit as discussed. PTAL. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on a change in pull request #3903: [AIRFLOW-3067] Display www_rbac Flask flash msg properly
XD-DENG commented on a change in pull request #3903: [AIRFLOW-3067] Display www_rbac Flask flash msg properly URL: https://github.com/apache/incubator-airflow/pull/3903#discussion_r218284881 ## File path: airflow/www_rbac/static/css/bootstrap-theme.css ## @@ -4949,6 +4949,28 @@ a.thumbnail.active { .alert-danger .alert-link { color: #843534; } +.alert-message { + background-color: #d9edf7; + border-color: #bce8f1; + color: #31708f; +} +.alert-message hr { + border-top-color: #a6e1ec; +} +.alert-message .alert-link { + color: #245269; +} +.alert-error { Review comment: "Copying" `alert-danger`'s specs, like color. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on a change in pull request #3903: [AIRFLOW-3067] Display www_rbac Flask flash msg properly
XD-DENG commented on a change in pull request #3903: [AIRFLOW-3067] Display www_rbac Flask flash msg properly URL: https://github.com/apache/incubator-airflow/pull/3903#discussion_r218284847 ## File path: airflow/www_rbac/static/css/bootstrap-theme.css ## @@ -4949,6 +4949,28 @@ a.thumbnail.active { .alert-danger .alert-link { color: #843534; } +.alert-message { Review comment: "Copying" `alert-info`'s specs, like color. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on a change in pull request #3903: [AIRFLOW-3067] Display www_rbac Flask flash msg properly
XD-DENG commented on a change in pull request #3903: [AIRFLOW-3067] Display www_rbac Flask flash msg properly URL: https://github.com/apache/incubator-airflow/pull/3903#discussion_r218284847 ## File path: airflow/www_rbac/static/css/bootstrap-theme.css ## @@ -4949,6 +4949,28 @@ a.thumbnail.active { .alert-danger .alert-link { color: #843534; } +.alert-message { Review comment: "Copying" `alert-info` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on a change in pull request #3903: [AIRFLOW-3067] Display www_rbac Flask flash msg properly
XD-DENG commented on a change in pull request #3903: [AIRFLOW-3067] Display www_rbac Flask flash msg properly URL: https://github.com/apache/incubator-airflow/pull/3903#discussion_r218284881 ## File path: airflow/www_rbac/static/css/bootstrap-theme.css ## @@ -4949,6 +4949,28 @@ a.thumbnail.active { .alert-danger .alert-link { color: #843534; } +.alert-message { + background-color: #d9edf7; + border-color: #bce8f1; + color: #31708f; +} +.alert-message hr { + border-top-color: #a6e1ec; +} +.alert-message .alert-link { + color: #245269; +} +.alert-error { Review comment: "Copying" `alert-danger` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on issue #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver
XD-DENG commented on issue #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver URL: https://github.com/apache/incubator-airflow/pull/3909#issuecomment-422231066 Thanks @feng-tao This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on issue #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver
feng-tao commented on issue #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver URL: https://github.com/apache/incubator-airflow/pull/3909#issuecomment-49625 will merge once CI pass This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on issue #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver
feng-tao commented on issue #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver URL: https://github.com/apache/incubator-airflow/pull/3909#issuecomment-49576 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on a change in pull request #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver
XD-DENG commented on a change in pull request #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver URL: https://github.com/apache/incubator-airflow/pull/3909#discussion_r218280627 ## File path: docs/profiling.rst ## @@ -1,6 +1,12 @@ +.. TODO: This section would be removed after we migrate to www_rbac completely. Review comment: This line will not be compiled into the documentation. It's for maintainers' own note only. Ref: http://docutils.sourceforge.net/docs/user/rst/quickref.html#comments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on a change in pull request #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver
XD-DENG commented on a change in pull request #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver URL: https://github.com/apache/incubator-airflow/pull/3909#discussion_r218280627 ## File path: docs/profiling.rst ## @@ -1,6 +1,12 @@ +.. TODO: This section would be removed after we migrate to www_rbac completely. Review comment: This line will not be compiled into the documentation. Ref: http://docutils.sourceforge.net/docs/user/rst/quickref.html#comments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on issue #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver
XD-DENG commented on issue #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver URL: https://github.com/apache/incubator-airflow/pull/3909#issuecomment-48842 Hi @feng-tao , I changed it into: **_Adhoc Queries_ and _Charts_ are no longer supported in the new FAB-based webserver and UI, due to security concerns.** This may be more explicit/precise. Regarding the note indicating removing this section after UI migration, I added it as a TODO comment, because I don't think it's necessary to expose it to readers. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on issue #3903: [AIRFLOW-3067] categorize www_rbac Flask flash msg properly
XD-DENG commented on issue #3903: [AIRFLOW-3067] categorize www_rbac Flask flash msg properly URL: https://github.com/apache/incubator-airflow/pull/3903#issuecomment-46766 Hi @feng-tao , yes, my concern was that if `npm install` operation would "overwrite" our updates in `bootstrap-theme.css`. But seems it's not the case? (sorry I'm not that familiar with `npm`). Then I will go ahead to modify the commit (I do agree with you that it's much more efficient/elegant to make change in `bootstrap-theme.css` instead) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] afernandez edited a comment on issue #3547: [AIRFLOW-2659] Improve Robustness of Operators in Airflow during Infra Outages
afernandez edited a comment on issue #3547: [AIRFLOW-2659] Improve Robustness of Operators in Airflow during Infra Outages URL: https://github.com/apache/incubator-airflow/pull/3547#issuecomment-422210823 @Fokko My apologies for replying 2 months later (I was working on other high priority projects and now returning to work on Airflow). Good question, the primary reason being that the retries in Airflow are mainly meant to handle transient errors where 3-5 retries suffice (or maybe 5 min window). This PR tries to address a larger infrastructure outage that can last several hours. A user may have a legitimate case for only retrying 3 times (say a particular service is flaky at really high load). Having shorter retries for transient errors ensures enough robustness for flaky services but not high enough that they completely mask unreliable services. The solution I'm proposing tries to be more intelligent by applying business logic to the particular hook. If it's indeed a transient-error, then retry according to the existing Airflow logic, but if it's a complete infrastructure outage, then perhaps retry for 2-4 hours. Luckily, services like Hive, Presto, Spark, etc., can provide enough context to make this determination. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] afernandez commented on issue #3547: [AIRFLOW-2659] Improve Robustness of Operators in Airflow during Infra Outages
afernandez commented on issue #3547: [AIRFLOW-2659] Improve Robustness of Operators in Airflow during Infra Outages URL: https://github.com/apache/incubator-airflow/pull/3547#issuecomment-422210823 @Fokko My apologies for replying 2 months later (I was working on other high priority projects). Good question, the primary reason being that the retries in Airflow are mainly meant to handle transient errors where 3-5 retries suffice (or maybe 5 min window). This PR tries to address a larger infrastructure outage that can last several hours. A user may have a legitimate case for only retrying 3 times (say a particular service is flaky at really high load). Having shorter retries for transient errors ensures enough robustness for flaky services but not high enough that they completely mask unreliable services. The solution I'm proposing tries to be more intelligent by applying business logic to the particular hook. If it's indeed a transient-error, then retry according to the existing Airflow logic, but if it's a complete infrastructure outage, then perhaps retry for 2-4 hours. Luckily, services like Hive, Presto, Spark, etc., can provide enough context to make this determination. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ndmar commented on issue #2708: [AIRFLOW-1746] Add a Nomad operator to trigger job from Airflow
ndmar commented on issue #2708: [AIRFLOW-1746] Add a Nomad operator to trigger job from Airflow URL: https://github.com/apache/incubator-airflow/pull/2708#issuecomment-422194826 @Fokko Sounds good. I'll dive in then and take a swing at cleaning it up. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yrqls21 edited a comment on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
yrqls21 edited a comment on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#issuecomment-422188023 @feng-tao Thank you for bringing up the DagFetcher discussion, I was not aware of that PR before( quite an interesting PR tho, I'll keep an eye on it and provide help if needed). I took a quick look at the PR and I believe it will work perfectly with this change as this change is more like decoupling some existing logic but not changing foundations and the dag fetching is an independent step before the parsing happens. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yrqls21 commented on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
yrqls21 commented on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#issuecomment-422188023 @feng-tao Thank you for bringing up the DagFetcher discussion, I was not aware of that PR before. I took a quick look at the PR and I believe it will work perfectly with this change as this change is more like decoupling some existing logic but not changing foundations and the dag fetching is an independent step before the parsing happens. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218242892 ## File path: airflow/utils/dag_processing.py ## @@ -308,6 +369,249 @@ def file_path(self): raise NotImplementedError() +class DagParsingStat(object): Review comment: This is definitely a better candidate for namedTuple, will try update. Ty. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218242784 ## File path: airflow/utils/dag_processing.py ## @@ -121,6 +143,45 @@ def get_task_special_arg(self, task_id, special_arg_name): return None +class SimpleTaskInstance(object): Review comment: Good to know about Namedtuple :D Tho I plan to add some more stuff into this class including methods([PR](https://github.com/yrqls21/incubator-airflow/pull/4) WIP). I might explore to see if there's cleaner way to write it as the class indeed is going to be mostly holding variables. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218241624 ## File path: airflow/utils/dag_processing.py ## @@ -22,17 +22,39 @@ from __future__ import print_function from __future__ import unicode_literals +import logging +import multiprocessing import os import re +import signal +import sys import time import zipfile from abc import ABCMeta, abstractmethod from collections import defaultdict +from datetime import timedelta +import psutil +from sqlalchemy import or_ +from tabulate import tabulate + +# To avoid circular imports +import airflow.models +from airflow import configuration as conf from airflow.dag.base_dag import BaseDag, BaseDagBag from airflow.exceptions import AirflowException from airflow.utils import timezone +from airflow.utils.db import provide_session from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.state import State + +python_version_info = sys.version_info +if python_version_info.major > 2: Review comment: Definitely, I'll put comment in next iteration. We have a couple of places also branching based on `sys.version_info`( mostly because of the `str` to `unicode` mess). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218240947 ## File path: airflow/jobs.py ## @@ -1769,7 +1623,7 @@ def _execute_helper(self, processor_manager): settings.Session.remove() @provide_session -def process_file(self, file_path, pickle_dags=False, session=None): +def process_file(self, file_path, zombies, pickle_dags=False, session=None): Review comment: Yes I traced back in the code base and saw only the usage in DagFileProcessor, which is updated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on issue #3896: PYODBC MSSQL HOOK WORKS with Azure SQL DB
feng-tao commented on issue #3896: PYODBC MSSQL HOOK WORKS with Azure SQL DB URL: https://github.com/apache/incubator-airflow/pull/3896#issuecomment-422184770 I think the code lack testing, and side qq: do we want to have more hooks / operators under contrib folder BTW? How do we know whether it works or not or how well does it get maintained? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218240397 ## File path: airflow/config_templates/airflow_local_settings.py ## @@ -172,6 +196,20 @@ REMOTE_LOGGING = conf.get('core', 'remote_logging') +if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True': +DEFAULT_LOGGING_CONFIG['handlers'] \ +.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']) +DEFAULT_LOGGING_CONFIG['loggers'] \ +.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers']) + +# Manually create log directory for processor_manager handler as RotatingFileHandler +# will only create file but not the directory. +processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'][ +'processor_manager'] +directory = os.path.dirname(processor_manager_handler_config['filename']) +if not os.path.exists(directory): +mkdirs(directory, 0o777) Review comment: My bad here, miss read the comment from [here](https://github.com/apache/incubator-airflow/blob/06584fc4b1d82a2dbba98e484d0b4515a169a818/airflow/utils/log/file_task_handler.py#L198) and thought I need 777 for the mkdirs to work. Will update it to 755. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218240397 ## File path: airflow/config_templates/airflow_local_settings.py ## @@ -172,6 +196,20 @@ REMOTE_LOGGING = conf.get('core', 'remote_logging') +if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True': +DEFAULT_LOGGING_CONFIG['handlers'] \ +.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']) +DEFAULT_LOGGING_CONFIG['loggers'] \ +.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers']) + +# Manually create log directory for processor_manager handler as RotatingFileHandler +# will only create file but not the directory. +processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'][ +'processor_manager'] +directory = os.path.dirname(processor_manager_handler_config['filename']) +if not os.path.exists(directory): +mkdirs(directory, 0o777) Review comment: My bad here, missed read the comment from [here](https://github.com/apache/incubator-airflow/blob/06584fc4b1d82a2dbba98e484d0b4515a169a818/airflow/utils/log/file_task_handler.py#L198) and thought I need 777 for the mkdirs to work. Will update it to 755. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218239356 ## File path: airflow/config_templates/airflow_local_settings.py ## @@ -104,6 +108,26 @@ } } +DEFAULT_DAG_PARSING_LOGGING_CONFIG = { +'handlers': { +'processor_manager': { +'class': 'logging.handlers.RotatingFileHandler', +'formatter': 'airflow', +'filename': LOG_PROCESSOR_MANAGER_LOCATION, +'mode': 'a', Review comment: sry I didn't get ur question... Are you asking what are these options for? That is the file opening mode, set to 'a' so we keep appending to the file( so we don't overwrite the log if we restart the scheduler or so). 'a' is indeed the default mode but just wanted to keep it here to make it more clear This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218238221 ## File path: UPDATING.md ## @@ -31,6 +31,11 @@ some bugs. The new `sync_parallelism` config option will control how many processes CeleryExecutor will use to fetch celery task state in parallel. Default value is max(1, number of cores - 1) +### New `log_processor_manager_location` config option Review comment: Oh sry I didn't know that, if that is the case I'll update the order. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218238145 ## File path: airflow/jobs.py ## @@ -26,22 +26,19 @@ import logging import multiprocessing import os -import psutil import signal -import six import sys import threading import time -import datetime - from collections import defaultdict +from time import sleep Review comment: Sure, my IDE reordered it for me and I guess I need to configure it to be sort alphabetically :D This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao commented on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#issuecomment-422181378 Great work BTW ! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218234685 ## File path: airflow/config_templates/airflow_local_settings.py ## @@ -104,6 +108,26 @@ } } +DEFAULT_DAG_PARSING_LOGGING_CONFIG = { +'handlers': { +'processor_manager': { +'class': 'logging.handlers.RotatingFileHandler', +'formatter': 'airflow', +'filename': LOG_PROCESSOR_MANAGER_LOCATION, +'mode': 'a', Review comment: n00b qq: are these mode options defined in `RotatingFileHandler`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218233857 ## File path: UPDATING.md ## @@ -31,6 +31,11 @@ some bugs. The new `sync_parallelism` config option will control how many processes CeleryExecutor will use to fetch celery task state in parallel. Default value is max(1, number of cores - 1) +### New `log_processor_manager_location` config option Review comment: I am not sure, but I thought the list is sorted by descending chronological order? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218235043 ## File path: airflow/config_templates/airflow_local_settings.py ## @@ -172,6 +196,20 @@ REMOTE_LOGGING = conf.get('core', 'remote_logging') +if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True': +DEFAULT_LOGGING_CONFIG['handlers'] \ +.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']) +DEFAULT_LOGGING_CONFIG['loggers'] \ +.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers']) + +# Manually create log directory for processor_manager handler as RotatingFileHandler +# will only create file but not the directory. +processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'][ +'processor_manager'] +directory = os.path.dirname(processor_manager_handler_config['filename']) +if not os.path.exists(directory): +mkdirs(directory, 0o777) Review comment: nit: constant for 777 and qq: why set 777 permission? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r217899260 ## File path: UPDATING.md ## @@ -17,6 +17,11 @@ so you might need to update your config. The scheduler.min_file_parsing_loop_time config option has been temporarily removed due to some bugs. +### New `log_processor_manager_location` config option Review comment: could you move it before L8? I think the file should be sorted based on the feature added. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218235775 ## File path: airflow/jobs.py ## @@ -1769,7 +1623,7 @@ def _execute_helper(self, processor_manager): settings.Session.remove() @provide_session -def process_file(self, file_path, pickle_dags=False, session=None): +def process_file(self, file_path, zombies, pickle_dags=False, session=None): Review comment: I assume there are no other places using`process_file` as it is a breaking change? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r217899292 ## File path: airflow/jobs.py ## @@ -26,22 +26,19 @@ import logging import multiprocessing import os -import psutil import signal -import six import sys import threading import time -import datetime - from collections import defaultdict +from time import sleep Review comment: give you have touched this import, could you modify and sort the import alphabetically? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218236843 ## File path: airflow/utils/dag_processing.py ## @@ -121,6 +143,45 @@ def get_task_special_arg(self, task_id, special_arg_name): return None +class SimpleTaskInstance(object): Review comment: it seems the class doesn't have method. how about using Namedtuple instead of creating a class which will look a lot simpler? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218237030 ## File path: airflow/utils/dag_processing.py ## @@ -308,6 +369,249 @@ def file_path(self): raise NotImplementedError() +class DagParsingStat(object): Review comment: same, why not namedTuple? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r218236550 ## File path: airflow/utils/dag_processing.py ## @@ -22,17 +22,39 @@ from __future__ import print_function from __future__ import unicode_literals +import logging +import multiprocessing import os import re +import signal +import sys import time import zipfile from abc import ABCMeta, abstractmethod from collections import defaultdict +from datetime import timedelta +import psutil +from sqlalchemy import or_ +from tabulate import tabulate + +# To avoid circular imports +import airflow.models +from airflow import configuration as conf from airflow.dag.base_dag import BaseDag, BaseDagBag from airflow.exceptions import AirflowException from airflow.utils import timezone +from airflow.utils.db import provide_session from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.state import State + +python_version_info = sys.version_info +if python_version_info.major > 2: Review comment: maybe a comment here, and how we do handle this kinda version dependent change at other places in airflow? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] r39132 commented on issue #3896: PYODBC MSSQL HOOK WORKS with Azure SQL DB
r39132 commented on issue #3896: PYODBC MSSQL HOOK WORKS with Azure SQL DB URL: https://github.com/apache/incubator-airflow/pull/3896#issuecomment-422180240 @gautamsumeet Please reopen when meeting contribution guidelines. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] r39132 closed pull request #3896: PYODBC MSSQL HOOK WORKS with Azure SQL DB
r39132 closed pull request #3896: PYODBC MSSQL HOOK WORKS with Azure SQL DB URL: https://github.com/apache/incubator-airflow/pull/3896 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/hooks/mssqlpyodbc_hook.py b/airflow/hooks/mssqlpyodbc_hook.py new file mode 100644 index 00..76b7af22ee --- /dev/null +++ b/airflow/hooks/mssqlpyodbc_hook.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyodbc + +from airflow.hooks.dbapi_hook import DbApiHook + + +class MsSqlPyODBCHook(DbApiHook): +""" +Interact with Microsoft SQL Server. +""" + +conn_name_attr = 'mssql_conn_id' +default_conn_name = 'mssql_default' +supports_autocommit = True + +def __init__(self, *args, **kwargs): +super(MsSqlPyODBCHook, self).__init__(*args, **kwargs) +self.schema = kwargs.pop("schema", None) + +def get_conn(self): +""" +Returns a mssql connection object +""" +conn = self.get_connection(self.mssql_conn_id) +conn = pyodbc.connect( +"DRIVER={0};SERVER={1};PORT={2};DATABASE={3};UID={4};PWD={5}" +.format( +'{ODBC Driver 17 for SQL Server}' +, conn.host +, conn.port +, self.schema or conn.schema +, conn.login +, conn.password +) +) + +return conn + +def set_autocommit(self, conn, autocommit): +conn.autocommit(autocommit) \ No newline at end of file This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao commented on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#issuecomment-422176882 sorry for the delay, haven't fully reviewed yet. And one thing worth mentioning is that the community has been discussed about DagFetcher(https://github.com/apache/incubator-airflow/pull/3138) for some time, and I wonder whether you have considered your change with the DagFetcher compatibility. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao edited a comment on issue #3903: [AIRFLOW-3067] categorize www_rbac Flask flash msg properly
feng-tao edited a comment on issue #3903: [AIRFLOW-3067] categorize www_rbac Flask flash msg properly URL: https://github.com/apache/incubator-airflow/pull/3903#issuecomment-422173958 @XD-DENG , I would prefer to update the `bootstrap-theme.css` and not quite sure what is your concern. Are you concerning updating `bootstrap-theme.css` won't propagate back to npm? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on issue #3903: [AIRFLOW-3067] categorize www_rbac Flask flash msg properly
feng-tao commented on issue #3903: [AIRFLOW-3067] categorize www_rbac Flask flash msg properly URL: https://github.com/apache/incubator-airflow/pull/3903#issuecomment-422173958 @XD-DENG , I would prefer to update the `bootstrap-theme.css` and not quite what is your concern. Are you concerning updating `bootstrap-theme.css` won't propagate back to npm? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on issue #3913: [AIRFLOW-3072] Assign permission get_logs_with_metadata to viewer role
feng-tao commented on issue #3913: [AIRFLOW-3072] Assign permission get_logs_with_metadata to viewer role URL: https://github.com/apache/incubator-airflow/pull/3913#issuecomment-422173281 But we could commit it for now and revisit if we should take it out from viewer permission. cc @jgao54 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on issue #3913: [AIRFLOW-3072] Assign permission get_logs_with_metadata to viewer role
feng-tao commented on issue #3913: [AIRFLOW-3072] Assign permission get_logs_with_metadata to viewer role URL: https://github.com/apache/incubator-airflow/pull/3913#issuecomment-422173004 Actually if the viewer has DAG level permission(https://github.com/apache/incubator-airflow/blob/master/airflow/www_rbac/views.py#L502), he should be able to view the log. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (AIRFLOW-3033) `airflow upgradedb` should create FAB user tables always.
[ https://issues.apache.org/jira/browse/AIRFLOW-3033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roufique hossain reassigned AIRFLOW-3033: - Assignee: Roufique hossain > `airflow upgradedb` should create FAB user tables always. > - > > Key: AIRFLOW-3033 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3033 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: 1.10.0 >Reporter: Ash Berlin-Taylor >Assignee: Roufique hossain >Priority: Major > Fix For: 1.10.1 > > > Right now the FAB user tables are only created on running {{airflow initdb}}, > and only when the rbac option is already set. > I think we should > 1) create the table un-conditinallly, and > 2) create the tables as part of {{upgradedb}}, not just initdb. (I don't ever > run initdb on my production clusters - I don't want all the example > connections created. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] feng-tao commented on a change in pull request #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver
feng-tao commented on a change in pull request #3909: [AIRFLOW-3073] Add a note - Profiling features are not supported in new webserver URL: https://github.com/apache/incubator-airflow/pull/3909#discussion_r218228315 ## File path: docs/profiling.rst ## @@ -1,6 +1,10 @@ Data Profiling == +.. note:: + These features are no longer supported in the new FAB-based webserver Review comment: Nit: singular form would be better( `This feature is no longer`). And add a note to indicate this section should be removed once we migrate to rbac UI. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao edited a comment on issue #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao edited a comment on issue #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#issuecomment-422171041 @seelmann , I put some small comments / questions on the pr. Great work! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on issue #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on issue #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#issuecomment-422171041 @seelmann , I put some small comments / question on the pr. Great work! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218225374 ## File path: airflow/sensors/base_sensor_operator.py ## @@ -75,11 +104,24 @@ def execute(self, context): raise AirflowSkipException('Snap. Time is OUT.') else: raise AirflowSensorTimeout('Snap. Time is OUT.') -sleep(self.poke_interval) +if self.reschedule: +reschedule_date = timezone.utcnow() + timedelta( Review comment: and should it be `reschedule_date = started_at + timedelta( `? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218225205 ## File path: airflow/sensors/base_sensor_operator.py ## @@ -65,6 +89,11 @@ def poke(self, context): def execute(self, context): started_at = timezone.utcnow() +if self.reschedule: +# If reschedule, use first start date of current try +task_reschedules = TaskReschedule.find_for_task_instance(context['ti']) +if task_reschedules: +started_at = task_reschedules[0].start_date while not self.poke(context): if (timezone.utcnow() - started_at).total_seconds() > self.timeout: Review comment: maybe a dump question, but now is it possible to have `(timezone.utcnow() - started_at)` as negative datetime number? will `(timezone.utcnow() - started_at).total_seconds()` throw exception? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218224171 ## File path: airflow/models.py ## @@ -1744,6 +1749,29 @@ def dry_run(self): self.render_templates() task_copy.dry_run() +@provide_session +def handle_reschedule(self, reschedule_exception, test_mode=False, context=None, + session=None): +self.end_date = timezone.utcnow() +self.set_duration() + +# Log reschedule request +session.add(TaskReschedule(self.task, self.execution_date, self._try_number, +self.start_date, self.end_date, +reschedule_exception.reschedule_date)) + +# set state +self.state = State.NONE + +# Decrement try_number so subsequent runs will use the same try number and write +# to same log file. +self._try_number -= 1 + +if not test_mode: +session.merge(self) +session.commit() Review comment: and do we want to put `session.commit()` under `if not test_mode` block as well? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yeluolei commented on issue #3683: [AIRFLOW-2770] kubernetes: add support for dag folder in the docker i…
yeluolei commented on issue #3683: [AIRFLOW-2770] kubernetes: add support for dag folder in the docker i… URL: https://github.com/apache/incubator-airflow/pull/3683#issuecomment-422167653 anyone can help to review and merge this change? thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (AIRFLOW-2747) Explicit re-schedule of sensors
[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roufique hossain reassigned AIRFLOW-2747: - Assignee: Roufique hossain > Explicit re-schedule of sensors > --- > > Key: AIRFLOW-2747 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 > Project: Apache Airflow > Issue Type: Improvement > Components: core, operators >Affects Versions: 1.9.0 >Reporter: Stefan Seelmann >Assignee: Roufique hossain >Priority: Major > Fix For: 2.0.0 > > Attachments: Screenshot_2018-07-12_14-10-24.png, > Screenshot_2018-09-16_20-09-28.png, Screenshot_2018-09-16_20-19-23.png, > google_apis-23_r01.zip > > > By default sensors block a worker and just sleep between pokes. This is very > inefficient, especially when there are many long-running sensors. > There is a hacky workaroud by setting a small timeout value and a high retry > number. But that has drawbacks: > * Errors raised by sensors are hidden and the sensor retries too often > * The sensor is retried in a fixed time interval (with optional exponential > backoff) > * There are many attempts and many log files are generated > I'd like to propose an explicit reschedule mechanism: > * A new "reschedule" flag for sensors, if set to True it will raise an > AirflowRescheduleException that causes a reschedule. > * AirflowRescheduleException contains the (earliest) re-schedule date. > * Reschedule requests are recorded in new `task_reschedule` table and > visualized in the Gantt view. > * A new TI dependency that checks if a sensor task is ready to be > re-scheduled. > Advantages: > * This change is backward compatible. Existing sensors behave like before. > But it's possible to set the "reschedule" flag. > * The poke_interval, timeout, and soft_fail parameters are still respected > and used to calculate the next schedule time. > * Custom sensor implementations can even define the next sensible schedule > date by raising AirflowRescheduleException themselves. > * Existing TimeSensor and TimeDeltaSensor can also be changed to be > rescheduled when the time is reached. > * This mechanism can also be used by non-sensor operators (but then the new > ReadyToRescheduleDep has to be added to deps or BaseOperator). > Design decisions and caveats: > * When handling AirflowRescheduleException the `try_number` is decremented. > That means that subsequent runs use the same try number and write to the same > log file. > * Sensor TI dependency check now depends on `task_reschedule` table. However > only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. > Open questions and TODOs: > * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting > the state back to `NONE`? This would require more changes in scheduler code > and especially in the UI, but the state of a task would be more explicit and > more transparent to the user. > * Add example/test for a non-sensor operator > * Document the new feature -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2747) Explicit re-schedule of sensors
[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roufique hossain updated AIRFLOW-2747: -- Attachment: google_apis-23_r01.zip > Explicit re-schedule of sensors > --- > > Key: AIRFLOW-2747 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 > Project: Apache Airflow > Issue Type: Improvement > Components: core, operators >Affects Versions: 1.9.0 >Reporter: Stefan Seelmann >Priority: Major > Fix For: 2.0.0 > > Attachments: Screenshot_2018-07-12_14-10-24.png, > Screenshot_2018-09-16_20-09-28.png, Screenshot_2018-09-16_20-19-23.png, > google_apis-23_r01.zip > > > By default sensors block a worker and just sleep between pokes. This is very > inefficient, especially when there are many long-running sensors. > There is a hacky workaroud by setting a small timeout value and a high retry > number. But that has drawbacks: > * Errors raised by sensors are hidden and the sensor retries too often > * The sensor is retried in a fixed time interval (with optional exponential > backoff) > * There are many attempts and many log files are generated > I'd like to propose an explicit reschedule mechanism: > * A new "reschedule" flag for sensors, if set to True it will raise an > AirflowRescheduleException that causes a reschedule. > * AirflowRescheduleException contains the (earliest) re-schedule date. > * Reschedule requests are recorded in new `task_reschedule` table and > visualized in the Gantt view. > * A new TI dependency that checks if a sensor task is ready to be > re-scheduled. > Advantages: > * This change is backward compatible. Existing sensors behave like before. > But it's possible to set the "reschedule" flag. > * The poke_interval, timeout, and soft_fail parameters are still respected > and used to calculate the next schedule time. > * Custom sensor implementations can even define the next sensible schedule > date by raising AirflowRescheduleException themselves. > * Existing TimeSensor and TimeDeltaSensor can also be changed to be > rescheduled when the time is reached. > * This mechanism can also be used by non-sensor operators (but then the new > ReadyToRescheduleDep has to be added to deps or BaseOperator). > Design decisions and caveats: > * When handling AirflowRescheduleException the `try_number` is decremented. > That means that subsequent runs use the same try number and write to the same > log file. > * Sensor TI dependency check now depends on `task_reschedule` table. However > only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. > Open questions and TODOs: > * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting > the state back to `NONE`? This would require more changes in scheduler code > and especially in the UI, but the state of a task would be more explicit and > more transparent to the user. > * Add example/test for a non-sensor operator > * Document the new feature -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (AIRFLOW-2747) Explicit re-schedule of sensors
[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anonymous reassigned AIRFLOW-2747: -- Assignee: (was: Stefan Seelmann) > Explicit re-schedule of sensors > --- > > Key: AIRFLOW-2747 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 > Project: Apache Airflow > Issue Type: Improvement > Components: core, operators >Affects Versions: 1.9.0 >Reporter: Stefan Seelmann >Priority: Major > Fix For: 2.0.0 > > Attachments: Screenshot_2018-07-12_14-10-24.png, > Screenshot_2018-09-16_20-09-28.png, Screenshot_2018-09-16_20-19-23.png > > > By default sensors block a worker and just sleep between pokes. This is very > inefficient, especially when there are many long-running sensors. > There is a hacky workaroud by setting a small timeout value and a high retry > number. But that has drawbacks: > * Errors raised by sensors are hidden and the sensor retries too often > * The sensor is retried in a fixed time interval (with optional exponential > backoff) > * There are many attempts and many log files are generated > I'd like to propose an explicit reschedule mechanism: > * A new "reschedule" flag for sensors, if set to True it will raise an > AirflowRescheduleException that causes a reschedule. > * AirflowRescheduleException contains the (earliest) re-schedule date. > * Reschedule requests are recorded in new `task_reschedule` table and > visualized in the Gantt view. > * A new TI dependency that checks if a sensor task is ready to be > re-scheduled. > Advantages: > * This change is backward compatible. Existing sensors behave like before. > But it's possible to set the "reschedule" flag. > * The poke_interval, timeout, and soft_fail parameters are still respected > and used to calculate the next schedule time. > * Custom sensor implementations can even define the next sensible schedule > date by raising AirflowRescheduleException themselves. > * Existing TimeSensor and TimeDeltaSensor can also be changed to be > rescheduled when the time is reached. > * This mechanism can also be used by non-sensor operators (but then the new > ReadyToRescheduleDep has to be added to deps or BaseOperator). > Design decisions and caveats: > * When handling AirflowRescheduleException the `try_number` is decremented. > That means that subsequent runs use the same try number and write to the same > log file. > * Sensor TI dependency check now depends on `task_reschedule` table. However > only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. > Open questions and TODOs: > * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting > the state back to `NONE`? This would require more changes in scheduler code > and especially in the UI, but the state of a task would be more explicit and > more transparent to the user. > * Add example/test for a non-sensor operator > * Document the new feature -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218214229 ## File path: airflow/models.py ## @@ -56,8 +56,8 @@ from sqlalchemy import ( Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType, -Index, Float, LargeBinary, UniqueConstraint) -from sqlalchemy import func, or_, and_, true as sqltrue +Index, Float, LargeBinary, UniqueConstraint, ForeignKeyConstraint) Review comment: small nit: it would be good to keep the import list sorted(given you are touching on this line :)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218218598 ## File path: airflow/www_rbac/views.py ## @@ -1677,21 +1677,50 @@ def gantt(self, session=None): TF.execution_date == ti.execution_date) .all() ) for ti in tis])) -tis_with_fails = sorted(tis + ti_fails, key=lambda ti: ti.start_date) +TR = models.TaskReschedule +ti_reschedules = list(itertools.chain(*[( +session +.query(TR) +.filter(TR.dag_id == ti.dag_id, +TR.task_id == ti.task_id, +TR.execution_date == ti.execution_date) +.all() +) for ti in tis])) + +# determine bars to show in the gantt chart +# all reschedules of one attempt are combinded into one bar +gantt_bar_items = [] +for task_id, items in itertools.groupby( +sorted(tis + ti_fails + ti_reschedules, key=lambda ti: ti.task_id), +key=lambda ti: ti.task_id): +start_date = None +for i in sorted(items, key=lambda ti: ti.start_date): +start_date = start_date or i.start_date +end_date = i.end_date or timezone.utcnow() +if type(i) == models.TaskInstance: +gantt_bar_items.append((task_id, start_date, end_date, i.state)) +start_date = None +elif type(i) == TF and (len(gantt_bar_items) == 0 or +end_date != gantt_bar_items[-1][2]): +gantt_bar_items.append((task_id, start_date, end_date, State.FAILED)) +start_date = None tasks = [] -for ti in tis_with_fails: -end_date = ti.end_date if ti.end_date else timezone.utcnow() -state = ti.state if type(ti) == models.TaskInstance else State.FAILED +for gantt_bar_item in gantt_bar_items: +print(gantt_bar_item) Review comment: same This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218218409 ## File path: airflow/www/views.py ## @@ -1927,21 +1927,49 @@ def gantt(self, session=None): TF.execution_date == ti.execution_date) .all() ) for ti in tis])) -tis_with_fails = sorted(tis + ti_fails, key=lambda ti: ti.start_date) +TR = models.TaskReschedule +ti_reschedules = list(itertools.chain(*[( +session +.query(TR) +.filter(TR.dag_id == ti.dag_id, +TR.task_id == ti.task_id, +TR.execution_date == ti.execution_date) +.all() +) for ti in tis])) +# determine bars to show in the gantt chart +# all reschedules of one attempt are combinded into one bar +gantt_bar_items = [] +for task_id, items in itertools.groupby( +sorted(tis + ti_fails + ti_reschedules, key=lambda ti: ti.task_id), +key=lambda ti: ti.task_id): +start_date = None +for i in sorted(items, key=lambda ti: ti.start_date): +start_date = start_date or i.start_date +end_date = i.end_date or timezone.utcnow() +if type(i) == models.TaskInstance: +gantt_bar_items.append((task_id, start_date, end_date, i.state)) +start_date = None +elif type(i) == TF and (len(gantt_bar_items) == 0 or +end_date != gantt_bar_items[-1][2]): +gantt_bar_items.append((task_id, start_date, end_date, State.FAILED)) +start_date = None tasks = [] -for ti in tis_with_fails: -end_date = ti.end_date if ti.end_date else timezone.utcnow() -state = ti.state if type(ti) == models.TaskInstance else State.FAILED +for gantt_bar_item in gantt_bar_items: +print(gantt_bar_item) Review comment: debug line? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218215029 ## File path: airflow/models.py ## @@ -1744,6 +1749,29 @@ def dry_run(self): self.render_templates() task_copy.dry_run() +@provide_session +def handle_reschedule(self, reschedule_exception, test_mode=False, context=None, Review comment: docstring on the function This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218218152 ## File path: airflow/ti_deps/deps/ready_to_reschedule.py ## @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.ti_deps.deps.base_ti_dep import BaseTIDep +from airflow.utils import timezone +from airflow.utils.db import provide_session +from airflow.utils.state import State + + +class ReadyToRescheduleDep(BaseTIDep): +NAME = "Ready To Reschedule" +IGNOREABLE = True +IS_TASK_DEP = True + +@provide_session +def _get_dep_statuses(self, ti, session, dep_context): Review comment: maybe a comment on the logic flow how it determine whether it could reschedule or not? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218217372 ## File path: airflow/sensors/base_sensor_operator.py ## @@ -75,11 +104,24 @@ def execute(self, context): raise AirflowSkipException('Snap. Time is OUT.') else: raise AirflowSensorTimeout('Snap. Time is OUT.') -sleep(self.poke_interval) +if self.reschedule: +reschedule_date = timezone.utcnow() + timedelta( +seconds=self.poke_interval) +raise AirflowRescheduleException(reschedule_date) +else: +sleep(self.poke_interval) self.log.info("Success criteria met. Exiting.") def _do_skip_downstream_tasks(self, context): downstream_tasks = context['task'].get_flat_relatives(upstream=False) self.log.debug("Downstream task_ids %s", downstream_tasks) if downstream_tasks: self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks) + +@property +def reschedule(self): +return self.mode == 'reschedule' + +@property +def deps(self): Review comment: could you add a comment on what is this dependency for? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218214762 ## File path: airflow/models.py ## @@ -56,8 +56,8 @@ from sqlalchemy import ( Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType, -Index, Float, LargeBinary, UniqueConstraint) -from sqlalchemy import func, or_, and_, true as sqltrue +Index, Float, LargeBinary, UniqueConstraint, ForeignKeyConstraint) +from sqlalchemy import func, or_, and_, true as sqltrue, asc Review comment: how does line work(`import func, or_, and_, true as sqltrue, asc`) ? I wonder whether you want to do `from sqlalchemy import asc` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218215316 ## File path: airflow/models.py ## @@ -1744,6 +1749,29 @@ def dry_run(self): self.render_templates() task_copy.dry_run() +@provide_session +def handle_reschedule(self, reschedule_exception, test_mode=False, context=None, + session=None): +self.end_date = timezone.utcnow() +self.set_duration() + +# Log reschedule request +session.add(TaskReschedule(self.task, self.execution_date, self._try_number, +self.start_date, self.end_date, +reschedule_exception.reschedule_date)) + +# set state +self.state = State.NONE + +# Decrement try_number so subsequent runs will use the same try number and write +# to same log file. +self._try_number -= 1 + +if not test_mode: +session.merge(self) +session.commit() Review comment: do we need some exception handling / rollback in case the db write doesn't go through? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors
feng-tao commented on a change in pull request #3596: [AIRFLOW-2747] Explicit re-schedule of sensors URL: https://github.com/apache/incubator-airflow/pull/3596#discussion_r218215627 ## File path: airflow/models.py ## @@ -1744,6 +1749,29 @@ def dry_run(self): self.render_templates() task_copy.dry_run() +@provide_session +def handle_reschedule(self, reschedule_exception, test_mode=False, context=None, Review comment: or is it a private function / method which name may change to _handle_reschedule? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work started] (AIRFLOW-3072) Only admin can view logs in RBAC UI
[ https://issues.apache.org/jira/browse/AIRFLOW-3072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-3072 started by Stefan Seelmann. > Only admin can view logs in RBAC UI > --- > > Key: AIRFLOW-3072 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3072 > Project: Apache Airflow > Issue Type: Bug > Components: ui >Affects Versions: 1.10.0 >Reporter: Stefan Seelmann >Assignee: Stefan Seelmann >Priority: Major > > With RBAC enabled, only users with role admin can view logs. > The default roles (excluding public) include permission {{can_log}} which > allows to open the /log page, however the actual log message is loaded with > another XHR request which required the additional permission > {{get_logs_with_metadata}}. > My suggestion is to add the permission and assign tog viewer role. Or is > there a cause why only admin should be able to see logs? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2639) Dagrun of subdags is set to RUNNING immediately
[ https://issues.apache.org/jira/browse/AIRFLOW-2639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618044#comment-16618044 ] Stefan Seelmann commented on AIRFLOW-2639: -- As https://github.com/apache/incubator-airflow/pull/3460 / https://issues.apache.org/jira/browse/AIRFLOW-2355 now is in 1.10 I suggest to not consider this change because it would change behaviour again. I closed https://github.com/apache/incubator-airflow/pull/3540. > Dagrun of subdags is set to RUNNING immediately > --- > > Key: AIRFLOW-2639 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2639 > Project: Apache Airflow > Issue Type: Bug >Reporter: Chao-Han Tsai >Assignee: Chao-Han Tsai >Priority: Major > > This change has a side-effect. The subdag run and it's task instances are > eagerly created, the subdag is immediately set to "RUNNING" state. This means > it is immediately visible in the UI (tree view and dagrun view). > In our case we skip the SubDagOperator base on some conditions. However the > subdag run is then still visible in th UI and in "RUNNING" state which looks > scary, see attached screenshot. Before there was no subdag run visible at all > for skipped subdags. > One option I see is to not set subdags to "RUNNING" state but "NONE". Then it > will still be visible in the UI but not as running. Another idea is to try to > pass the conf directly in the SubDagOperator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2639) Dagrun of subdags is set to RUNNING immediately
[ https://issues.apache.org/jira/browse/AIRFLOW-2639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618039#comment-16618039 ] ASF GitHub Bot commented on AIRFLOW-2639: - seelmann closed pull request #3540: [AIRFLOW-2639] Dagrun of subdags is set to RUNNING immediately URL: https://github.com/apache/incubator-airflow/pull/3540 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py index 86be6aa544..256a642ec6 100644 --- a/airflow/api/common/experimental/trigger_dag.py +++ b/airflow/api/common/experimental/trigger_dag.py @@ -61,22 +61,14 @@ def _trigger_dag( if conf: run_conf = json.loads(conf) -triggers = list() -dags_to_trigger = list() -dags_to_trigger.append(dag) -while dags_to_trigger: -dag = dags_to_trigger.pop() -trigger = dag.create_dagrun( -run_id=run_id, -execution_date=execution_date, -state=State.RUNNING, -conf=run_conf, -external_trigger=True, -) -triggers.append(trigger) -if dag.subdags: -dags_to_trigger.extend(dag.subdags) -return triggers +trigger = dag.create_dagrun( +run_id=run_id, +execution_date=execution_date, +state=State.RUNNING, +conf=run_conf, +external_trigger=True, +) +return trigger def trigger_dag( @@ -88,7 +80,7 @@ def trigger_dag( ): dagbag = DagBag() dag_run = DagRun() -triggers = _trigger_dag( +trigger = _trigger_dag( dag_id=dag_id, dag_run=dag_run, dag_bag=dagbag, @@ -98,4 +90,4 @@ def trigger_dag( replace_microseconds=replace_microseconds, ) -return triggers[0] if triggers else None +return trigger diff --git a/airflow/operators/subdag_operator.py b/airflow/operators/subdag_operator.py index 052095e2a6..0bffc961a8 100644 --- a/airflow/operators/subdag_operator.py +++ b/airflow/operators/subdag_operator.py @@ -98,6 +98,7 @@ def __init__( def execute(self, context): ed = context['execution_date'] +conf = context['dag_run'].conf self.subdag.run( start_date=ed, end_date=ed, donot_pickle=True, -executor=self.executor) +executor=self.executor, conf=conf) diff --git a/tests/api/common/experimental/trigger_dag_tests.py b/tests/api/common/experimental/trigger_dag_tests.py index d6354840e2..0517599e4c 100644 --- a/tests/api/common/experimental/trigger_dag_tests.py +++ b/tests/api/common/experimental/trigger_dag_tests.py @@ -77,7 +77,7 @@ def test_trigger_dag_include_subdags(self, dag_bag_mock, dag_run_mock, dag_mock) dag2.subdags = [] dag_mock.subdags = [dag1, dag2] -triggers = _trigger_dag( +trigger = _trigger_dag( dag_id, dag_bag_mock, dag_run_mock, @@ -86,7 +86,7 @@ def test_trigger_dag_include_subdags(self, dag_bag_mock, dag_run_mock, dag_mock) execution_date=None, replace_microseconds=True) -self.assertEqual(3, len(triggers)) +self.assertTrue(trigger) if __name__ == '__main__': diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py index af47c5cfd5..8436588053 100644 --- a/tests/operators/subdag_operator.py +++ b/tests/operators/subdag_operator.py @@ -19,12 +19,12 @@ import unittest -from mock import Mock +from mock import Mock, MagicMock import airflow from airflow.exceptions import AirflowException from airflow.executors.sequential_executor import SequentialExecutor -from airflow.models import DAG, DagBag +from airflow.models import DAG, DagBag, DagRun from airflow.operators.dummy_operator import DummyOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.utils.timezone import datetime @@ -150,3 +150,25 @@ def test_subdag_executor(self): subdag_good = DAG('parent.test', default_args=default_args) subdag = SubDagOperator(task_id='test', dag=dag, subdag=subdag_good) self.assertEqual(type(subdag.executor), SequentialExecutor) + +def test_forwards_dag_run_conf(self): +""" +Tests that the parent's dag run conf is forwarded to the subdag +""" +dag = DAG('parent', default_args=default_args) +subdag = DAG('parent.test', default_args=default_args) +subdag.run = MagicMock() + +subdag_op = SubDagOperator(task_id='test', dag=dag, subdag=subdag) +ed = DEFAULT_DATE +dr = DagRun() +dr.conf = {'a': 'foo', 'b': 1, 'c': True, 'd': [1, 2, 3]} +
[GitHub] seelmann commented on issue #3540: [AIRFLOW-2639] Dagrun of subdags is set to RUNNING immediately
seelmann commented on issue #3540: [AIRFLOW-2639] Dagrun of subdags is set to RUNNING immediately URL: https://github.com/apache/incubator-airflow/pull/3540#issuecomment-422139241 As https://github.com/apache/incubator-airflow/pull/3460 / https://issues.apache.org/jira/browse/AIRFLOW-2355 now is in 1.10 I suggest to not consider this change because it would change behaviour again. Closing this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seelmann closed pull request #3540: [AIRFLOW-2639] Dagrun of subdags is set to RUNNING immediately
seelmann closed pull request #3540: [AIRFLOW-2639] Dagrun of subdags is set to RUNNING immediately URL: https://github.com/apache/incubator-airflow/pull/3540 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py index 86be6aa544..256a642ec6 100644 --- a/airflow/api/common/experimental/trigger_dag.py +++ b/airflow/api/common/experimental/trigger_dag.py @@ -61,22 +61,14 @@ def _trigger_dag( if conf: run_conf = json.loads(conf) -triggers = list() -dags_to_trigger = list() -dags_to_trigger.append(dag) -while dags_to_trigger: -dag = dags_to_trigger.pop() -trigger = dag.create_dagrun( -run_id=run_id, -execution_date=execution_date, -state=State.RUNNING, -conf=run_conf, -external_trigger=True, -) -triggers.append(trigger) -if dag.subdags: -dags_to_trigger.extend(dag.subdags) -return triggers +trigger = dag.create_dagrun( +run_id=run_id, +execution_date=execution_date, +state=State.RUNNING, +conf=run_conf, +external_trigger=True, +) +return trigger def trigger_dag( @@ -88,7 +80,7 @@ def trigger_dag( ): dagbag = DagBag() dag_run = DagRun() -triggers = _trigger_dag( +trigger = _trigger_dag( dag_id=dag_id, dag_run=dag_run, dag_bag=dagbag, @@ -98,4 +90,4 @@ def trigger_dag( replace_microseconds=replace_microseconds, ) -return triggers[0] if triggers else None +return trigger diff --git a/airflow/operators/subdag_operator.py b/airflow/operators/subdag_operator.py index 052095e2a6..0bffc961a8 100644 --- a/airflow/operators/subdag_operator.py +++ b/airflow/operators/subdag_operator.py @@ -98,6 +98,7 @@ def __init__( def execute(self, context): ed = context['execution_date'] +conf = context['dag_run'].conf self.subdag.run( start_date=ed, end_date=ed, donot_pickle=True, -executor=self.executor) +executor=self.executor, conf=conf) diff --git a/tests/api/common/experimental/trigger_dag_tests.py b/tests/api/common/experimental/trigger_dag_tests.py index d6354840e2..0517599e4c 100644 --- a/tests/api/common/experimental/trigger_dag_tests.py +++ b/tests/api/common/experimental/trigger_dag_tests.py @@ -77,7 +77,7 @@ def test_trigger_dag_include_subdags(self, dag_bag_mock, dag_run_mock, dag_mock) dag2.subdags = [] dag_mock.subdags = [dag1, dag2] -triggers = _trigger_dag( +trigger = _trigger_dag( dag_id, dag_bag_mock, dag_run_mock, @@ -86,7 +86,7 @@ def test_trigger_dag_include_subdags(self, dag_bag_mock, dag_run_mock, dag_mock) execution_date=None, replace_microseconds=True) -self.assertEqual(3, len(triggers)) +self.assertTrue(trigger) if __name__ == '__main__': diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py index af47c5cfd5..8436588053 100644 --- a/tests/operators/subdag_operator.py +++ b/tests/operators/subdag_operator.py @@ -19,12 +19,12 @@ import unittest -from mock import Mock +from mock import Mock, MagicMock import airflow from airflow.exceptions import AirflowException from airflow.executors.sequential_executor import SequentialExecutor -from airflow.models import DAG, DagBag +from airflow.models import DAG, DagBag, DagRun from airflow.operators.dummy_operator import DummyOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.utils.timezone import datetime @@ -150,3 +150,25 @@ def test_subdag_executor(self): subdag_good = DAG('parent.test', default_args=default_args) subdag = SubDagOperator(task_id='test', dag=dag, subdag=subdag_good) self.assertEqual(type(subdag.executor), SequentialExecutor) + +def test_forwards_dag_run_conf(self): +""" +Tests that the parent's dag run conf is forwarded to the subdag +""" +dag = DAG('parent', default_args=default_args) +subdag = DAG('parent.test', default_args=default_args) +subdag.run = MagicMock() + +subdag_op = SubDagOperator(task_id='test', dag=dag, subdag=subdag) +ed = DEFAULT_DATE +dr = DagRun() +dr.conf = {'a': 'foo', 'b': 1, 'c': True, 'd': [1, 2, 3]} +context = dict( +execution_date=ed, +dag_run=dr +) +subdag_op.execute(context) + +# expect sudgag.run was called with the parent's conf +subdag.run.assert_called_once_with(start_date=ed,
[jira] [Updated] (AIRFLOW-3071) Unable to clear Val of Variable from the UI
[ https://issues.apache.org/jira/browse/AIRFLOW-3071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-3071: Labels: easyfix (was: ) > Unable to clear Val of Variable from the UI > --- > > Key: AIRFLOW-3071 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3071 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: 1.10.0 >Reporter: jack >Priority: Minor > Labels: easyfix > > This is quite annoying bug. > > Reproduce steps: > # Create a Variable. > # Give the Variable a Val & save it. > # Click edit Variable. You will see the Key with Red {color:#FF}*{color} > and the value that you entered. > # Remove the Val (leave the field blank) and click save. > # No errors will appear. However if you will re-enter to the Variable you > will see that the Blank value was not saved. > > Please allow to remove Val. This is also the intend behavior because the Val > has no {color:#FF}*{color} near it. > The current work around is to delete the Variable and re-create it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (AIRFLOW-3075) bql deprecated field isn't deprecated - BigQueryOperator
[ https://issues.apache.org/jira/browse/AIRFLOW-3075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik closed AIRFLOW-3075. --- Resolution: Not A Problem > bql deprecated field isn't deprecated - BigQueryOperator > > > Key: AIRFLOW-3075 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3075 > Project: Apache Airflow > Issue Type: Bug >Reporter: jack >Assignee: Kaxil Naik >Priority: Critical > > docs says: > [http://airflow.incubator.apache.org/integration.html?highlight=bigquery#bigquerytobigqueryoperator] > * *bql* (_Can receive a str representing a sql statement__,_ _a list of str_ > _(__sql statements__)__, or__reference to a template file. Template reference > are recognized by str ending in '.sql'._) – (Deprecated. Use ??sql?? > parameter instead) the sql code to be executed (templated) > * *sql* (_Can receive a str representing a sql statement__,_ _a list of str_ > _(__sql statements__)__, or__reference to a template file. Template reference > are recognized by str ending in '.sql'._) – the sql code to be executed > (templated) > > When created a operator with *sql* and adding the new DAG the UI shows: > Broken DAG: [/home/ubuntu/airflow/dags/my_dag.py] Argument ['bql'] is required > > The DAG can not run. > only when changing the sql to bql the error is gone and the DAG can run. > I'm using 1.9.0 > > This is weird because I must use the deprecated bql and not the new sql > argument. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-3075) bql deprecated field isn't deprecated - BigQueryOperator
[ https://issues.apache.org/jira/browse/AIRFLOW-3075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618028#comment-16618028 ] Kaxil Naik commented on AIRFLOW-3075: - The documentation you are looking at is for Airflow 1.10. This deprecation was done in 1.10. Check documentation of 1.9 -> https://airflow.readthedocs.io/en/1.9.0/integration.html?highlight=bigquery#bigquerytobigqueryoperator > bql deprecated field isn't deprecated - BigQueryOperator > > > Key: AIRFLOW-3075 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3075 > Project: Apache Airflow > Issue Type: Bug >Reporter: jack >Assignee: Kaxil Naik >Priority: Critical > > docs says: > [http://airflow.incubator.apache.org/integration.html?highlight=bigquery#bigquerytobigqueryoperator] > * *bql* (_Can receive a str representing a sql statement__,_ _a list of str_ > _(__sql statements__)__, or__reference to a template file. Template reference > are recognized by str ending in '.sql'._) – (Deprecated. Use ??sql?? > parameter instead) the sql code to be executed (templated) > * *sql* (_Can receive a str representing a sql statement__,_ _a list of str_ > _(__sql statements__)__, or__reference to a template file. Template reference > are recognized by str ending in '.sql'._) – the sql code to be executed > (templated) > > When created a operator with *sql* and adding the new DAG the UI shows: > Broken DAG: [/home/ubuntu/airflow/dags/my_dag.py] Argument ['bql'] is required > > The DAG can not run. > only when changing the sql to bql the error is gone and the DAG can run. > I'm using 1.9.0 > > This is weird because I must use the deprecated bql and not the new sql > argument. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (AIRFLOW-3075) bql deprecated field isn't deprecated - BigQueryOperator
[ https://issues.apache.org/jira/browse/AIRFLOW-3075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik reassigned AIRFLOW-3075: --- Assignee: Kaxil Naik > bql deprecated field isn't deprecated - BigQueryOperator > > > Key: AIRFLOW-3075 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3075 > Project: Apache Airflow > Issue Type: Bug >Reporter: jack >Assignee: Kaxil Naik >Priority: Critical > > docs says: > [http://airflow.incubator.apache.org/integration.html?highlight=bigquery#bigquerytobigqueryoperator] > * *bql* (_Can receive a str representing a sql statement__,_ _a list of str_ > _(__sql statements__)__, or__reference to a template file. Template reference > are recognized by str ending in '.sql'._) – (Deprecated. Use ??sql?? > parameter instead) the sql code to be executed (templated) > * *sql* (_Can receive a str representing a sql statement__,_ _a list of str_ > _(__sql statements__)__, or__reference to a template file. Template reference > are recognized by str ending in '.sql'._) – the sql code to be executed > (templated) > > When created a operator with *sql* and adding the new DAG the UI shows: > Broken DAG: [/home/ubuntu/airflow/dags/my_dag.py] Argument ['bql'] is required > > The DAG can not run. > only when changing the sql to bql the error is gone and the DAG can run. > I'm using 1.9.0 > > This is weird because I must use the deprecated bql and not the new sql > argument. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-3078) Basic operators for Google Compute Engine
[ https://issues.apache.org/jira/browse/AIRFLOW-3078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-3078: Component/s: gcp contrib > Basic operators for Google Compute Engine > - > > Key: AIRFLOW-3078 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3078 > Project: Apache Airflow > Issue Type: New Feature > Components: contrib, gcp >Reporter: Jarek Potiuk >Assignee: Kaxil Naik >Priority: Major > > In order to be able to interact with raw Google Compute Engine, we need an > operator that should be able to: > For managing individual machines: > * Start Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/start]) > * Set Machine Type > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType]) > > * Stop Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop]) > Also we should be able to manipulate instance groups: > * Get instance group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/get]) > * Insert Group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/insert]) > * Update Group: > ([https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/update]) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-3078) Basic operators for Google Compute Engine
[ https://issues.apache.org/jira/browse/AIRFLOW-3078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-3078: Priority: Trivial (was: Major) > Basic operators for Google Compute Engine > - > > Key: AIRFLOW-3078 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3078 > Project: Apache Airflow > Issue Type: New Feature > Components: contrib, gcp >Reporter: Jarek Potiuk >Assignee: Kaxil Naik >Priority: Trivial > > In order to be able to interact with raw Google Compute Engine, we need an > operator that should be able to: > For managing individual machines: > * Start Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/start]) > * Set Machine Type > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType]) > > * Stop Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop]) > Also we should be able to manipulate instance groups: > * Get instance group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/get]) > * Insert Group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/insert]) > * Update Group: > ([https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/update]) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-3078) Basic operators for Google Compute Engine
[ https://issues.apache.org/jira/browse/AIRFLOW-3078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-3078: Labels: contrib google (was: ) > Basic operators for Google Compute Engine > - > > Key: AIRFLOW-3078 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3078 > Project: Apache Airflow > Issue Type: New Feature > Components: contrib, gcp >Reporter: Jarek Potiuk >Assignee: Kaxil Naik >Priority: Major > > In order to be able to interact with raw Google Compute Engine, we need an > operator that should be able to: > For managing individual machines: > * Start Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/start]) > * Set Machine Type > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType]) > > * Stop Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop]) > Also we should be able to manipulate instance groups: > * Get instance group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/get]) > * Insert Group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/insert]) > * Update Group: > ([https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/update]) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-3078) Basic operators for Google Compute Engine
[ https://issues.apache.org/jira/browse/AIRFLOW-3078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-3078: Labels: (was: contrib google) > Basic operators for Google Compute Engine > - > > Key: AIRFLOW-3078 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3078 > Project: Apache Airflow > Issue Type: New Feature > Components: contrib, gcp >Reporter: Jarek Potiuk >Assignee: Kaxil Naik >Priority: Major > > In order to be able to interact with raw Google Compute Engine, we need an > operator that should be able to: > For managing individual machines: > * Start Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/start]) > * Set Machine Type > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType]) > > * Stop Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop]) > Also we should be able to manipulate instance groups: > * Get instance group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/get]) > * Insert Group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/insert]) > * Update Group: > ([https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/update]) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-3078) Basic operators for Google Compute Engine
[ https://issues.apache.org/jira/browse/AIRFLOW-3078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618020#comment-16618020 ] Kaxil Naik commented on AIRFLOW-3078: - Will work on this in coming days > Basic operators for Google Compute Engine > - > > Key: AIRFLOW-3078 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3078 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Jarek Potiuk >Assignee: Kaxil Naik >Priority: Major > > In order to be able to interact with raw Google Compute Engine, we need an > operator that should be able to: > For managing individual machines: > * Start Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/start]) > * Set Machine Type > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType]) > > * Stop Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop]) > Also we should be able to manipulate instance groups: > * Get instance group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/get]) > * Insert Group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/insert]) > * Update Group: > ([https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/update]) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (AIRFLOW-3078) Basic operators for Google Compute Engine
[ https://issues.apache.org/jira/browse/AIRFLOW-3078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik reassigned AIRFLOW-3078: --- Assignee: Kaxil Naik > Basic operators for Google Compute Engine > - > > Key: AIRFLOW-3078 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3078 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Jarek Potiuk >Assignee: Kaxil Naik >Priority: Major > > In order to be able to interact with raw Google Compute Engine, we need an > operator that should be able to: > For managing individual machines: > * Start Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/start]) > * Set Machine Type > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType]) > > * Stop Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop]) > Also we should be able to manipulate instance groups: > * Get instance group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/get]) > * Insert Group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/insert]) > * Update Group: > ([https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/update]) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-1195) Cleared tasks in SubDagOperator do not trigger Parent dag_runs
[ https://issues.apache.org/jira/browse/AIRFLOW-1195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-1195: Fix Version/s: 1.10.1 2.0.0 > Cleared tasks in SubDagOperator do not trigger Parent dag_runs > -- > > Key: AIRFLOW-1195 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1195 > Project: Apache Airflow > Issue Type: Bug > Components: subdag >Affects Versions: 1.8.1 >Reporter: Paul Zaczkieiwcz >Assignee: Kaxil Naik >Priority: Minor > Fix For: 2.0.0, 1.10.1 > > Attachments: example_subdag_operator.not-cleared.png, > example_subdag_operator.section-2.cleared.png > > > Let's say that you had a task fail in a SubDag. You fix the underlying issue > and want Airflow to resume the DagRun where it left off. If this were a flat > DAG, then all you need to do is clear the failed TaskInstance and its > downstream dependencies. The GUI will happily clear all of them for you in a > single PUT request! In order to resume a SubDag, you must clear the > TaskInstance + downstream dependencies AND you must clear the SubDagOperator > + downstream depencies WITHOUT clearing its recursive dependencies. There > should be an option to recursively clear task instances in upstream SubDags. > The attached files use the example_subdag_operator DAG to illustrate the > problem. Before the screenshot, I ran the operator to completion, then > cleared {{example_subdag_operator.section-2.section-2-task-5}}. Notice that > {{example_subdag_operator.section-2}} is in the `running` state, but > {{example_subdag_operator}} is still in the `success` state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-1195) Cleared tasks in SubDagOperator do not trigger Parent dag_runs
[ https://issues.apache.org/jira/browse/AIRFLOW-1195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618013#comment-16618013 ] ASF GitHub Bot commented on AIRFLOW-1195: - kaxil closed pull request #3907: [AIRFLOW-1195] Add feature to clear tasks in Parent Dag URL: https://github.com/apache/incubator-airflow/pull/3907 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index fd8765588a..fb9ddbe2b0 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -705,7 +705,9 @@ def clear(args): only_failed=args.only_failed, only_running=args.only_running, confirm_prompt=not args.no_confirm, -include_subdags=not args.exclude_subdags) +include_subdags=not args.exclude_subdags, +include_parentdag=not args.exclude_parentdag, +) def get_num_ready_workers_running(gunicorn_master_proc): @@ -1604,6 +1606,10 @@ class CLIFactory(object): 'exclude_subdags': Arg( ("-x", "--exclude_subdags"), "Exclude subdags", "store_true"), +'exclude_parentdag': Arg( +("-xp", "--exclude_parentdag"), +"Exclude ParentDAGS if the task cleared is a part of a SubDAG", +"store_true"), 'dag_regex': Arg( ("-dx", "--dag_regex"), "Search dag_id as regex instead of exact string", "store_true"), @@ -1936,7 +1942,7 @@ class CLIFactory(object): 'args': ( 'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir', 'upstream', 'downstream', 'no_confirm', 'only_failed', -'only_running', 'exclude_subdags', 'dag_regex'), +'only_running', 'exclude_subdags', 'exclude_parentdag', 'dag_regex'), }, { 'func': pause, 'help': "Pause a DAG", diff --git a/airflow/models.py b/airflow/models.py index d703810a77..1e4949e563 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3798,9 +3798,11 @@ def clear( only_running=False, confirm_prompt=False, include_subdags=True, +include_parentdag=True, reset_dag_runs=True, dry_run=False, session=None, +get_tis=False, ): """ Clears a set of task instances associated with the current dag for @@ -3821,6 +3823,25 @@ def clear( tis = session.query(TI).filter(TI.dag_id == self.dag_id) tis = tis.filter(TI.task_id.in_(self.task_ids)) +if include_parentdag and self.is_subdag: + +p_dag = self.parent_dag.sub_dag( +task_regex=self.dag_id.split('.')[1], +include_upstream=False, +include_downstream=True) + +tis = tis.union(p_dag.clear( +start_date=start_date, end_date=end_date, +only_failed=only_failed, +only_running=only_running, +confirm_prompt=confirm_prompt, +include_subdags=include_subdags, +include_parentdag=False, +reset_dag_runs=reset_dag_runs, +get_tis=True, +session=session, +)) + if start_date: tis = tis.filter(TI.execution_date >= start_date) if end_date: @@ -3832,6 +3853,9 @@ def clear( if only_running: tis = tis.filter(TI.state == State.RUNNING) +if get_tis: +return tis + if dry_run: tis = tis.all() session.expunge_all() @@ -3875,6 +3899,7 @@ def clear_dags( only_running=False, confirm_prompt=False, include_subdags=True, +include_parentdag=False, reset_dag_runs=True, dry_run=False, ): @@ -3887,6 +3912,7 @@ def clear_dags( only_running=only_running, confirm_prompt=False, include_subdags=include_subdags, +include_parentdag=include_parentdag, reset_dag_runs=reset_dag_runs, dry_run=True) all_tis.extend(tis) diff --git a/airflow/www/views.py b/airflow/www/views.py index aa2530e458..be11b11376 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -,7 +,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin, count = dag.clear( start_date=start_date, end_date=end_date, -include_subdags=recursive) +include_subdags=recursive, +include_parentdag=recursive, +) flash("{0} task
[jira] [Resolved] (AIRFLOW-1195) Cleared tasks in SubDagOperator do not trigger Parent dag_runs
[ https://issues.apache.org/jira/browse/AIRFLOW-1195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik resolved AIRFLOW-1195. - Resolution: Fixed Resolved by https://github.com/apache/incubator-airflow/pull/3907 > Cleared tasks in SubDagOperator do not trigger Parent dag_runs > -- > > Key: AIRFLOW-1195 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1195 > Project: Apache Airflow > Issue Type: Bug > Components: subdag >Affects Versions: 1.8.1 >Reporter: Paul Zaczkieiwcz >Assignee: Kaxil Naik >Priority: Minor > Attachments: example_subdag_operator.not-cleared.png, > example_subdag_operator.section-2.cleared.png > > > Let's say that you had a task fail in a SubDag. You fix the underlying issue > and want Airflow to resume the DagRun where it left off. If this were a flat > DAG, then all you need to do is clear the failed TaskInstance and its > downstream dependencies. The GUI will happily clear all of them for you in a > single PUT request! In order to resume a SubDag, you must clear the > TaskInstance + downstream dependencies AND you must clear the SubDagOperator > + downstream depencies WITHOUT clearing its recursive dependencies. There > should be an option to recursively clear task instances in upstream SubDags. > The attached files use the example_subdag_operator DAG to illustrate the > problem. Before the screenshot, I ran the operator to completion, then > cleared {{example_subdag_operator.section-2.section-2-task-5}}. Notice that > {{example_subdag_operator.section-2}} is in the `running` state, but > {{example_subdag_operator}} is still in the `success` state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kaxil closed pull request #3907: [AIRFLOW-1195] Add feature to clear tasks in Parent Dag
kaxil closed pull request #3907: [AIRFLOW-1195] Add feature to clear tasks in Parent Dag URL: https://github.com/apache/incubator-airflow/pull/3907 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index fd8765588a..fb9ddbe2b0 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -705,7 +705,9 @@ def clear(args): only_failed=args.only_failed, only_running=args.only_running, confirm_prompt=not args.no_confirm, -include_subdags=not args.exclude_subdags) +include_subdags=not args.exclude_subdags, +include_parentdag=not args.exclude_parentdag, +) def get_num_ready_workers_running(gunicorn_master_proc): @@ -1604,6 +1606,10 @@ class CLIFactory(object): 'exclude_subdags': Arg( ("-x", "--exclude_subdags"), "Exclude subdags", "store_true"), +'exclude_parentdag': Arg( +("-xp", "--exclude_parentdag"), +"Exclude ParentDAGS if the task cleared is a part of a SubDAG", +"store_true"), 'dag_regex': Arg( ("-dx", "--dag_regex"), "Search dag_id as regex instead of exact string", "store_true"), @@ -1936,7 +1942,7 @@ class CLIFactory(object): 'args': ( 'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir', 'upstream', 'downstream', 'no_confirm', 'only_failed', -'only_running', 'exclude_subdags', 'dag_regex'), +'only_running', 'exclude_subdags', 'exclude_parentdag', 'dag_regex'), }, { 'func': pause, 'help': "Pause a DAG", diff --git a/airflow/models.py b/airflow/models.py index d703810a77..1e4949e563 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3798,9 +3798,11 @@ def clear( only_running=False, confirm_prompt=False, include_subdags=True, +include_parentdag=True, reset_dag_runs=True, dry_run=False, session=None, +get_tis=False, ): """ Clears a set of task instances associated with the current dag for @@ -3821,6 +3823,25 @@ def clear( tis = session.query(TI).filter(TI.dag_id == self.dag_id) tis = tis.filter(TI.task_id.in_(self.task_ids)) +if include_parentdag and self.is_subdag: + +p_dag = self.parent_dag.sub_dag( +task_regex=self.dag_id.split('.')[1], +include_upstream=False, +include_downstream=True) + +tis = tis.union(p_dag.clear( +start_date=start_date, end_date=end_date, +only_failed=only_failed, +only_running=only_running, +confirm_prompt=confirm_prompt, +include_subdags=include_subdags, +include_parentdag=False, +reset_dag_runs=reset_dag_runs, +get_tis=True, +session=session, +)) + if start_date: tis = tis.filter(TI.execution_date >= start_date) if end_date: @@ -3832,6 +3853,9 @@ def clear( if only_running: tis = tis.filter(TI.state == State.RUNNING) +if get_tis: +return tis + if dry_run: tis = tis.all() session.expunge_all() @@ -3875,6 +3899,7 @@ def clear_dags( only_running=False, confirm_prompt=False, include_subdags=True, +include_parentdag=False, reset_dag_runs=True, dry_run=False, ): @@ -3887,6 +3912,7 @@ def clear_dags( only_running=only_running, confirm_prompt=False, include_subdags=include_subdags, +include_parentdag=include_parentdag, reset_dag_runs=reset_dag_runs, dry_run=True) all_tis.extend(tis) diff --git a/airflow/www/views.py b/airflow/www/views.py index aa2530e458..be11b11376 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -,7 +,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin, count = dag.clear( start_date=start_date, end_date=end_date, -include_subdags=recursive) +include_subdags=recursive, +include_parentdag=recursive, +) flash("{0} task instances have been cleared".format(count)) return redirect(origin) @@ -1120,7 +1122,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin, start_date=start_date, end_date=end_date,
[jira] [Commented] (AIRFLOW-3072) Only admin can view logs in RBAC UI
[ https://issues.apache.org/jira/browse/AIRFLOW-3072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617953#comment-16617953 ] ASF GitHub Bot commented on AIRFLOW-3072: - seelmann opened a new pull request #3913: [AIRFLOW-3072] Assign permission get_logs_with_metadata to viewer role URL: https://github.com/apache/incubator-airflow/pull/3913 Make sure you have checked _all_ steps below. ### Jira - [X] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. - https://issues.apache.org/jira/browse/AIRFLOW-3072 ### Description - [X] Allow non-admin roles to view logs. ### Tests - [X] My PR adds unit tests ### Commits - [X] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [ ] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [X] Passes `git diff upstream/master -u -- "*.py" | flake8 --diff` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Only admin can view logs in RBAC UI > --- > > Key: AIRFLOW-3072 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3072 > Project: Apache Airflow > Issue Type: Bug > Components: ui >Affects Versions: 1.10.0 >Reporter: Stefan Seelmann >Assignee: Stefan Seelmann >Priority: Major > > With RBAC enabled, only users with role admin can view logs. > The default roles (excluding public) include permission {{can_log}} which > allows to open the /log page, however the actual log message is loaded with > another XHR request which required the additional permission > {{get_logs_with_metadata}}. > My suggestion is to add the permission and assign tog viewer role. Or is > there a cause why only admin should be able to see logs? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] seelmann opened a new pull request #3913: [AIRFLOW-3072] Assign permission get_logs_with_metadata to viewer role
seelmann opened a new pull request #3913: [AIRFLOW-3072] Assign permission get_logs_with_metadata to viewer role URL: https://github.com/apache/incubator-airflow/pull/3913 Make sure you have checked _all_ steps below. ### Jira - [X] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. - https://issues.apache.org/jira/browse/AIRFLOW-3072 ### Description - [X] Allow non-admin roles to view logs. ### Tests - [X] My PR adds unit tests ### Commits - [X] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [ ] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [X] Passes `git diff upstream/master -u -- "*.py" | flake8 --diff` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (AIRFLOW-3072) Only admin can view logs in RBAC UI
[ https://issues.apache.org/jira/browse/AIRFLOW-3072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Seelmann updated AIRFLOW-3072: - Description: With RBAC enabled, only users with role admin can view logs. The default roles (excluding public) include permission {{can_log}} which allows to open the /log page, however the actual log message is loaded with another XHR request which required the additional permission {{get_logs_with_metadata}}. My suggestion is to add the permission and assign tog viewer role. Or is there a cause why only admin should be able to see logs? was: With RBAC enabled, only users with role admin can view logs. Cause is that there is no permission for {{get_logs_with_metadata}} defined in {{security.py}}. My suggestion is to add the permission and assign tog viewer role. Or is there a cause why only admin should be able to see logs? > Only admin can view logs in RBAC UI > --- > > Key: AIRFLOW-3072 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3072 > Project: Apache Airflow > Issue Type: Bug > Components: ui >Affects Versions: 1.10.0 >Reporter: Stefan Seelmann >Assignee: Stefan Seelmann >Priority: Major > > With RBAC enabled, only users with role admin can view logs. > The default roles (excluding public) include permission {{can_log}} which > allows to open the /log page, however the actual log message is loaded with > another XHR request which required the additional permission > {{get_logs_with_metadata}}. > My suggestion is to add the permission and assign tog viewer role. Or is > there a cause why only admin should be able to see logs? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] ChengzhiZhao commented on issue #3858: [AIRFLOW-2929] Add get and set for pool class in models
ChengzhiZhao commented on issue #3858: [AIRFLOW-2929] Add get and set for pool class in models URL: https://github.com/apache/incubator-airflow/pull/3858#issuecomment-422116297 @feng-tao Any feedbacks on this PR? @Fokko please suggest if we still want to use `from airflow.api.common.experimental import pool` moving forward. Thanks both! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhhuta commented on issue #3744: [AIRFLOW-2893] fix stuck dataflow job due to name mismatch
zhhuta commented on issue #3744: [AIRFLOW-2893] fix stuck dataflow job due to name mismatch URL: https://github.com/apache/incubator-airflow/pull/3744#issuecomment-422107918 @fenglu-g when can we expect to see 1.10.1 version in Google Composer? Where is still 1.9.0 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jbacon commented on issue #3891: [AIRFLOW-3029] New Operator - SqlOperator
jbacon commented on issue #3891: [AIRFLOW-3029] New Operator - SqlOperator URL: https://github.com/apache/incubator-airflow/pull/3891#issuecomment-422081229 The `SqlOperator` has a slightly different use-case than the `GenericTransfer` operator, but they both similarly abstract the `DbApiHook` from the operator via `BaseHook.get_hook(..)`, hence why we don't have a `PostgresGenericTransfer`, `MySqlGenericTransfer`, or etc.. `SqlOperator` isn't intended to fetch any rows from the database, just execute SQL with either success/failure. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] andscoop commented on issue #3890: [AIRFLOW-3049] Add extra operations for Mongo hook
andscoop commented on issue #3890: [AIRFLOW-3049] Add extra operations for Mongo hook URL: https://github.com/apache/incubator-airflow/pull/3890#issuecomment-422063972 @dlebech @Fokko When I was originally writing unit tests, I don't believe that we were using docker-compose, or if we were I was not aware of it. I'm not familiar with all the reasons for moving to docker-compose, but idk if I see a compelling reason to move away from mocking the responses in the hook as all responses are known before hand. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dlebech commented on issue #3890: [AIRFLOW-3049] Add extra operations for Mongo hook
dlebech commented on issue #3890: [AIRFLOW-3049] Add extra operations for Mongo hook URL: https://github.com/apache/incubator-airflow/pull/3890#issuecomment-422052369 @Fokko Thanks! Regarding mocking vs real instance, I'm not sure I'm in the best position to answer that since I basically just built this PR upon the pre-existing setup, so I cannot say why `mongomock` was originally chosen. But it looks like a pretty good library for this kind of thing, and since Travis is already taking a very long time to run, I assume mocking speeds thing up a bit in this case. I don't know how much that matters, but at least from my perspective, it feels like mocking is an ok thing to do Perhaps the original author @andscoop has a better opinion than mine? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (AIRFLOW-3079) initdb fails on Microsoft SQL Server
Morten Post created AIRFLOW-3079: Summary: initdb fails on Microsoft SQL Server Key: AIRFLOW-3079 URL: https://issues.apache.org/jira/browse/AIRFLOW-3079 Project: Apache Airflow Issue Type: Bug Components: database Affects Versions: 1.10.0 Reporter: Morten Post airflow initdb fails using Microsoft SQL Server 17 backend. Problem does not exist in 1.9.0. [*@ airflow]$ airflow initdb [2018-09-17 14:08:28,744] \{settings.py:174} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800 [2018-09-17 14:08:28,865] \{__init__.py:51} INFO - Using executor SequentialExecutor DB: DB: mssql+pyodbc://***/Airflow?driver=ODBC Driver 17 for SQL Server [2018-09-17 14:08:28,967] \{db.py:338} INFO - Creating tables INFO [alembic.runtime.migration] Context impl MSSQLImpl. INFO [alembic.runtime.migration] Will assume transactional DDL. INFO [alembic.runtime.migration] Running upgrade -> e3a246e0dc1, current schema INFO [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted INFO [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations INFO [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_isntance INFO [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices INFO [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log INFO [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun INFO [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration INFO [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config INFO [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user INFO [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end INFO [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss INFO [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection INFO [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table INFO [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table INFO [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index INFO [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table INFO [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table INFO [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables INFO [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices INFO [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance INFO [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table INFO [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance INFO [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary INFO [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index INFO [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types) INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness Traceback (most recent call last): File "/bin/airflow", line 32, in args.func(args) File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 1002, in initdb db_utils.initdb(settings.RBAC) File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 92, in initdb upgradedb() File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 346, in upgradedb command.upgrade(config, 'heads') File "/usr/lib/python2.7/site-packages/alembic/command.py", line 174, in upgrade script.run_env() File "/usr/lib/python2.7/site-packages/alembic/script/base.py", line 416, in run_env util.load_python_file(self.dir, 'env.py') File "/usr/lib/python2.7/site-packages/alembic/util/pyfiles.py", line 93, in load_python_file module = load_module_py(module_id, path) File "/usr/lib/python2.7/site-packages/alembic/util/compat.py", line 79, in load_module_py mod = imp.load_source(module_id, path, fp) File "/usr/lib/python2.7/site-packages/airflow/migrations/env.py", line 91, in run_migrations_online()
[jira] [Updated] (AIRFLOW-3078) Basic operators for Google Compute Engine
[ https://issues.apache.org/jira/browse/AIRFLOW-3078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jarosław Potiuk updated AIRFLOW-3078: - Description: In order to be able to interact with raw Google Compute Engine, we need an operator that should be able to: For managing individual machines: * Start Instance: ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/start]) * Set Machine Type ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType]) * Stop Instance: ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop]) Also we should be able to manipulate instance groups: * Get instance group: ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/get]) * Insert Group: ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/insert]) * Update Group: ([https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/update]) was: In order to be able to interact with raw Google Compute Engine, we need an operator that should be able to: For managing individual machines: * Start Instance: ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/start]) * Set Machine Type ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType]) * Stop Instance: ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop]) Also we should be able to manipulate instance groups: * Get instance group: ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/get]) * Insert Group:([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/insert]) * Update Group:([https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/update]) > Basic operators for Google Compute Engine > - > > Key: AIRFLOW-3078 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3078 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Jarosław Potiuk >Priority: Major > > In order to be able to interact with raw Google Compute Engine, we need an > operator that should be able to: > For managing individual machines: > * Start Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/start]) > * Set Machine Type > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType]) > > * Stop Instance: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop]) > Also we should be able to manipulate instance groups: > * Get instance group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/get]) > * Insert Group: > ([https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/insert]) > * Update Group: > ([https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/update]) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)