[
https://issues.apache.org/jira/browse/AIRFLOW-3479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16713457#comment-16713457
]
ASF GitHub Bot commented on AIRFLOW-3479:
-
kaxil closed pull request #4287: [AIRFLOW-3479] Keeps records in Log Table when
delete DAG & refine related tests
URL: https://github.com/apache/incubator-airflow/pull/4287
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/api/common/experimental/delete_dag.py
b/airflow/api/common/experimental/delete_dag.py
index b9ce736b48..e7d15772c9 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/experimental/delete_dag.py
@@ -23,7 +23,15 @@
from airflow.exceptions import DagNotFound, DagFileExists
-def delete_dag(dag_id):
+def delete_dag(dag_id, keep_records_in_log=True):
+"""
+:param dag_id: the dag_id of the DAG to delete
+:type dag_id: str
+:param keep_records_in_log: whether keep records of the given dag_id
+in the Log table in the backend database (for reasons like auditing).
+The default value is True.
+:type keep_records_in_log: bool
+"""
session = settings.Session()
DM = models.DagModel
@@ -41,6 +49,8 @@ def delete_dag(dag_id):
# noinspection PyUnresolvedReferences,PyProtectedMember
for m in models.Base._decl_class_registry.values():
if hasattr(m, "dag_id"):
+if keep_records_in_log and m.__name__ == 'Log':
+continue
cond = or_(m.dag_id == dag_id, m.dag_id.like(dag_id + ".%"))
count +=
session.query(m).filter(cond).delete(synchronize_session='fetch')
diff --git a/airflow/www/templates/airflow/dag.html
b/airflow/www/templates/airflow/dag.html
index b4c09de0b3..c34ec375b6 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -332,7 +332,7 @@
function confirmDeleteDag(dag_id){
return confirm("Are you sure you want to delete '"+dag_id+"' now?\n\
- This option will delete ALL metadata, DAG runs, etc.\n\
+ This option will delete ALL metadata, DAG runs, etc., EXCEPT Log\n\
This cannot be undone.");
}
diff --git a/tests/api/common/experimental/test_delete_dag.py
b/tests/api/common/experimental/test_delete_dag.py
new file mode 100644
index 00..a012e5d3d0
--- /dev/null
+++ b/tests/api/common/experimental/test_delete_dag.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow import models
+from airflow import settings
+from airflow.api.common.experimental.delete_dag import delete_dag
+from airflow.exceptions import DagNotFound, DagFileExists
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+from airflow.utils.state import State
+
+DM = models.DagModel
+DS = models.DagStat
+DR = models.DagRun
+TI = models.TaskInstance
+LOG = models.Log
+
+
+class TestDeleteDAGCatchError(unittest.TestCase):
+
+def setUp(self):
+self.session = settings.Session()
+self.dagbag = models.DagBag(include_examples=True)
+self.dag_id = 'example_bash_operator'
+self.dag = self.dagbag.dags[self.dag_id]
+
+def tearDown(self):
+self.dag.clear()
+self.session.close()
+
+def test_delete_dag_non_existent_dag(self):
+with self.assertRaises(DagNotFound):
+delete_dag("non-existent DAG")
+
+def test_delete_dag_dag_still_in_dagbag(self):
+models_to_check = ['DagModel', 'DagStat', 'DagRun', 'TaskInstance']
+record_counts = {}
+
+for model_name in models_to_check:
+m = getattr(models, model_name)
+record_counts[model_name] = self.session.query(m).filter(m.dag_id
== self.dag_id).count()
+
+with self.assertRaises(DagFileExists):
+delete_dag(self.dag_id)
+
+# No change should happen