[jira] [Commented] (AIRFLOW-1002) Add ability to remove DAG and all dependencies
[ https://issues.apache.org/jira/browse/AIRFLOW-1002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714592#comment-16714592 ] ASF GitHub Bot commented on AIRFLOW-1002: - ashb closed pull request #2684: [AIRFLOW-1002] Add ability to remove DAG and all dependencies URL: https://github.com/apache/incubator-airflow/pull/2684 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/api/client/api_client.py b/airflow/api/client/api_client.py index f24d80945f..1122bbd1d2 100644 --- a/airflow/api/client/api_client.py +++ b/airflow/api/client/api_client.py @@ -32,6 +32,15 @@ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None): """ raise NotImplementedError() +def delete_dag(self, dag_id): +""" +Creates a dag run for the specified dag +:param dag_id: +:param conf: +:return: +""" +raise NotImplementedError() + def get_pool(self, name): """Get pool. diff --git a/airflow/api/client/json_client.py b/airflow/api/client/json_client.py index 37e24d3c4e..789f969cbb 100644 --- a/airflow/api/client/json_client.py +++ b/airflow/api/client/json_client.py @@ -50,6 +50,19 @@ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None): }) return data['message'] +def delete_dag(self, dag_id): +endpoint = '/api/experimental/dags/{}'.format(dag_id) +url = urljoin(self._api_base_url, endpoint) + +resp = requests.delete(url, auth=self._auth) + +if not resp.ok: +raise IOError() + +data = resp.json() + +return data['message'] + def get_pool(self, name): endpoint = '/api/experimental/pools/{}'.format(name) url = urljoin(self._api_base_url, endpoint) diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py index 5bc7f76aaa..e2fae9590d 100644 --- a/airflow/api/client/local_client.py +++ b/airflow/api/client/local_client.py @@ -14,7 +14,7 @@ from airflow.api.client import api_client from airflow.api.common.experimental import pool -from airflow.api.common.experimental import trigger_dag +from airflow.api.common.experimental import trigger_dag, delete_dag class Client(api_client.Client): @@ -27,6 +27,10 @@ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None): execution_date=execution_date) return "Created {}".format(dr) +def delete_dag(self, dag_id): +dr = delete_dag.delete_dag(dag_id) +return "Deleted {}".format(dr) + def get_pool(self, name): p = pool.get_pool(name=name) return p.pool, p.slots, p.description diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/experimental/delete_dag.py new file mode 100644 index 00..3cad1b987a --- /dev/null +++ b/airflow/api/common/experimental/delete_dag.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow import AirflowException +from airflow.models import DagBag, DAG + + +def delete_dag(dag_id, dag_bag=None): +""" + +:param dag: DAG to be deleted +:param session: orm session +:return: Returns true if dagrun is scheduled and successfully deleted + Returns false if dagrun does not exist +""" + +if dag_id is None: +return False + +dag_bag = dag_bag or DagBag() + +if dag_id not in dag_bag.dags: +print ("dag not found") +raise AirflowException("Dag id {} not found".format(dag_id)) + +dag = dag_bag.get_dag(dag_id) + +alldags = [dag] +alldags.extend(dag.subdags) + +for dag in alldags: +dag.delete() + +return True + +def check_delete_dag(dag_id): +return DAG.find_deleted_entities(dag_id) diff --git a/airflow/jobs.py b/airflow/jobs.py index 2675bd3167..9fd54422ac 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -2465,6 +2465,8 @@ def __init__( pool=None, *args, **kwargs): self.task_instance = task_instance +self.dag_id = self.task_instance.dag_id +
[jira] [Commented] (AIRFLOW-1002) Add ability to remove DAG and all dependencies
[ https://issues.apache.org/jira/browse/AIRFLOW-1002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16428874#comment-16428874 ] Kyle Hamlin commented on AIRFLOW-1002: -- I think there is still a bit of missing functionality for this feature. When the Flask app is created sessions are attached to each of the models but DagModel doesn't have a session attached to it. If you try to call delete_dag from a CeleryExecutor you get a sqlalchemy.orm.exc.DetachedInstanceError {code:java} Traceback (most recent call last): File "", line 1, in File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 72, in wrapper raise e File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 69, in wrapper return f(*args, **kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 226, in delete_dag message = api_client.delete_dag(dag_id=args.dag_id) File "/usr/local/lib/python3.6/site-packages/airflow/api/client/local_client.py", line 32, in delete_dag count = delete_dag.delete_dag(dag_id) File "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/delete_dag.py", line 48, in delete_dag if dag.is_subdag: File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/attributes.py", line 237, in __get__ return self.impl.get(instance_state(instance), dict_) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/attributes.py", line 579, in get value = state._load_expired(state, passive) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 592, in _load_expired self.manager.deferred_scalar_loader(self, toload) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 659, in load_scalar_attributes (state_str(state))) sqlalchemy.orm.exc.DetachedInstanceError: Instance is not bound to a Session; attribute refresh operation cannot proceed{code} > Add ability to remove DAG and all dependencies > --- > > Key: AIRFLOW-1002 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1002 > Project: Apache Airflow > Issue Type: New Feature > Components: core, DAG >Affects Versions: Airflow 2.0, Airflow 1.8 >Reporter: Ruslan Dautkhanov >Assignee: Kengo Seki >Priority: Major > Fix For: 2.0.0 > > > I don't see an obvious way to remove a DAG with dependencies > Looks like folks have custom scripts to clean up backend database directly. > http://stackoverflow.com/questions/40651783/airflow-how-to-delete-a-dag > https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 > It would be nice to have a feature to remove a DAG directly in UI / through > API. > {code} > queries = {'delete from xcom where dag_id = "' + dag_input + '"', > 'delete from task_instance where dag_id = "' + dag_input + '"', > 'delete from sla_miss where dag_id = "' + dag_input + '"', > 'delete from log where dag_id = "' + dag_input + '"', > 'delete from job where dag_id = "' + dag_input + '"', > 'delete from dag_run where dag_id = "' + dag_input + '"', > 'delete from dag where dag_id = "' + dag_input + '"' } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-1002) Add ability to remove DAG and all dependencies
[ https://issues.apache.org/jira/browse/AIRFLOW-1002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359479#comment-16359479 ] ASF subversion and git services commented on AIRFLOW-1002: -- Commit 7488f2938da4e08645060531aa363204db7f50a5 in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=7488f29 ] [AIRFLOW-1002] Add ability to clean all dependencies of removed DAG After removing a dag file, there is no way to clean database except for removing corresponding records directly for now. This PR enables user to do this via command line. Closes #2199 from sekikn/AIRFLOW-1002 > Add ability to remove DAG and all dependencies > --- > > Key: AIRFLOW-1002 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1002 > Project: Apache Airflow > Issue Type: New Feature > Components: core, DAG >Affects Versions: Airflow 2.0, Airflow 1.8 >Reporter: Ruslan Dautkhanov >Assignee: Kengo Seki >Priority: Major > Fix For: 2.0.0 > > > I don't see an obvious way to remove a DAG with dependencies > Looks like folks have custom scripts to clean up backend database directly. > http://stackoverflow.com/questions/40651783/airflow-how-to-delete-a-dag > https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 > It would be nice to have a feature to remove a DAG directly in UI / through > API. > {code} > queries = {'delete from xcom where dag_id = "' + dag_input + '"', > 'delete from task_instance where dag_id = "' + dag_input + '"', > 'delete from sla_miss where dag_id = "' + dag_input + '"', > 'delete from log where dag_id = "' + dag_input + '"', > 'delete from job where dag_id = "' + dag_input + '"', > 'delete from dag_run where dag_id = "' + dag_input + '"', > 'delete from dag where dag_id = "' + dag_input + '"' } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-1002) Add ability to remove DAG and all dependencies
[ https://issues.apache.org/jira/browse/AIRFLOW-1002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359478#comment-16359478 ] ASF subversion and git services commented on AIRFLOW-1002: -- Commit 7488f2938da4e08645060531aa363204db7f50a5 in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=7488f29 ] [AIRFLOW-1002] Add ability to clean all dependencies of removed DAG After removing a dag file, there is no way to clean database except for removing corresponding records directly for now. This PR enables user to do this via command line. Closes #2199 from sekikn/AIRFLOW-1002 > Add ability to remove DAG and all dependencies > --- > > Key: AIRFLOW-1002 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1002 > Project: Apache Airflow > Issue Type: New Feature > Components: core, DAG >Affects Versions: Airflow 2.0, Airflow 1.8 >Reporter: Ruslan Dautkhanov >Assignee: Kengo Seki >Priority: Major > Fix For: 2.0.0 > > > I don't see an obvious way to remove a DAG with dependencies > Looks like folks have custom scripts to clean up backend database directly. > http://stackoverflow.com/questions/40651783/airflow-how-to-delete-a-dag > https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 > It would be nice to have a feature to remove a DAG directly in UI / through > API. > {code} > queries = {'delete from xcom where dag_id = "' + dag_input + '"', > 'delete from task_instance where dag_id = "' + dag_input + '"', > 'delete from sla_miss where dag_id = "' + dag_input + '"', > 'delete from log where dag_id = "' + dag_input + '"', > 'delete from job where dag_id = "' + dag_input + '"', > 'delete from dag_run where dag_id = "' + dag_input + '"', > 'delete from dag where dag_id = "' + dag_input + '"' } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-1002) Add ability to remove DAG and all dependencies
[ https://issues.apache.org/jira/browse/AIRFLOW-1002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16204992#comment-16204992 ] Arnab commented on AIRFLOW-1002: I was about to file a new JIRA for deletion of dags but noticed this one exists. So updating this jira with my comments. - Extended code for delete endpoint to handle non-existing DAG case (return 404) - Support for subdags The way I read the requirement is if a dag has subdags (dag id for subdag is .), then the subdag and its associated data should get deleted a s well when the user deletes the parent dag. Deleting the subdag by itself should also be supported - Also, if the scheduler is actively generating tasks for a dag, thought it may be a good idea to pause the dag (on which delete is called) first to prevent any additional tasks being added and then delete the same. - Considered additional tables to remove the dag record from (such as Xcom) - Use 204(no content) as return code for delete - Added 3 testcases(test_delete_invalid_dag, test_delete_dag_after_schedule_dag and test_delete_dag_after_schedule_dag) The first two tests are passing. The third one seems to be ok but is failing i n the PR. Any help appreciated. Submitted PR : https://github.com/apache/incubator-airflow/pull/2199 > Add ability to remove DAG and all dependencies > --- > > Key: AIRFLOW-1002 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1002 > Project: Apache Airflow > Issue Type: New Feature > Components: core, DAG >Affects Versions: Airflow 2.0, Airflow 1.8 >Reporter: Ruslan Dautkhanov >Assignee: Kengo Seki > > I don't see an obvious way to remove a DAG with dependencies > Looks like folks have custom scripts to clean up backend database directly. > http://stackoverflow.com/questions/40651783/airflow-how-to-delete-a-dag > https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 > It would be nice to have a feature to remove a DAG directly in UI / through > API. > {code} > queries = {'delete from xcom where dag_id = "' + dag_input + '"', > 'delete from task_instance where dag_id = "' + dag_input + '"', > 'delete from sla_miss where dag_id = "' + dag_input + '"', > 'delete from log where dag_id = "' + dag_input + '"', > 'delete from job where dag_id = "' + dag_input + '"', > 'delete from dag_run where dag_id = "' + dag_input + '"', > 'delete from dag where dag_id = "' + dag_input + '"' } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)