This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 9f7c10b Fix deleting of zipped Dags in Serialized Dag Table (#18243) 9f7c10b is described below commit 9f7c10bb88b1dd18069ebb671ae7972cffd180d6 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Wed Sep 15 14:33:40 2021 +0100 Fix deleting of zipped Dags in Serialized Dag Table (#18243) The file locations of DAGs in zipped folders are not correctly listed when removing deleted dags from serialized dag table thus the delete query for deleting deleted dags from serialized DAGs is deleting dags in zipped folders. Likewise DagCode.remove_deleted_code This PR fixes it by listing all the file paths as stored in SDM so that the delete query will work properly Co-authored-by: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> --- airflow/dag_processing/manager.py | 24 +++++++++++++++++++++--- tests/dag_processing/test_manager.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index b235250..5c39277 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -26,6 +26,7 @@ import random import signal import sys import time +import zipfile from collections import defaultdict from datetime import datetime, timedelta from importlib import import_module @@ -45,7 +46,7 @@ from airflow.models.taskinstance import SimpleTaskInstance from airflow.stats import Stats from airflow.utils import timezone from airflow.utils.callback_requests import CallbackRequest, SlaCallbackRequest, TaskCallbackRequest -from airflow.utils.file import list_py_file_paths +from airflow.utils.file import list_py_file_paths, might_contain_dag from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.mixins import MultiprocessingStartMethodMixin from airflow.utils.net import get_hostname @@ -661,12 +662,29 @@ class DagFileProcessorManager(LoggingMixin): except Exception: self.log.exception("Error removing old import errors") - SerializedDagModel.remove_deleted_dags(self._file_paths) + # Check if file path is a zipfile and get the full path of the python file. + # Without this, SerializedDagModel.remove_deleted_files would delete zipped dags. + # Likewise DagCode.remove_deleted_code + dag_filelocs = [] + for fileloc in self._file_paths: + if zipfile.is_zipfile(fileloc): + with zipfile.ZipFile(fileloc) as z: + dag_filelocs.extend( + [ + os.path.join(fileloc, info.filename) + for info in z.infolist() + if might_contain_dag(info.filename, True, z) + ] + ) + else: + dag_filelocs.append(fileloc) + + SerializedDagModel.remove_deleted_dags(dag_filelocs) DagModel.deactivate_deleted_dags(self._file_paths) from airflow.models.dagcode import DagCode - DagCode.remove_deleted_code(self._file_paths) + DagCode.remove_deleted_code(dag_filelocs) def _print_stat(self): """Occasionally print out stats about how fast the files are getting processed""" diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 2c62939..3023fe5 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -45,6 +45,7 @@ from airflow.dag_processing.manager import ( from airflow.dag_processing.processor import DagFileProcessorProcess from airflow.jobs.local_task_job import LocalTaskJob as LJ from airflow.models import DagBag, DagModel, TaskInstance as TI, errors +from airflow.models.dagcode import DagCode from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import SimpleTaskInstance from airflow.utils import timezone @@ -54,6 +55,7 @@ from airflow.utils.session import create_session from airflow.utils.state import DagRunState, State from airflow.utils.types import DagRunType from tests.core.test_logging_config import SETTINGS_FILE_VALID, settings_context +from tests.models import TEST_DAGS_FOLDER from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags @@ -110,9 +112,13 @@ class FakeDagFileProcessorRunner(DagFileProcessorProcess): class TestDagFileProcessorManager: def setup_method(self): clear_db_runs() + clear_db_serialized_dags() + clear_db_dags() def teardown_class(self): clear_db_runs() + clear_db_serialized_dags() + clear_db_dags() def run_processor_manager_one_loop(self, manager, parent_pipe): if not manager._async_mode: @@ -747,6 +753,30 @@ class TestDagFileProcessorManager: statsd_timing_mock.assert_called_with('dag_processing.last_duration.temp_dag', last_runtime) + def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir): + """Test DagFileProcessorManager._refresh_dag_dir method""" + manager = DagFileProcessorManager( + dag_directory=TEST_DAG_FOLDER, + max_runs=1, + processor_timeout=timedelta.max, + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ) + dagbag = DagBag(dag_folder=tmpdir, include_examples=False) + zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") + dagbag.process_file(zipped_dag_path) + dag = dagbag.get_dag("test_zip_dag") + dag.sync_to_db() + SerializedDagModel.write_dag(dag) + manager.last_dag_dir_refresh_time = timezone.utcnow() - timedelta(minutes=10) + manager._refresh_dag_dir() + # Assert dag not deleted in SDM + assert SerializedDagModel.has_dag('test_zip_dag') + # assert code not delted + assert DagCode.has_dag(dag.fileloc) + class TestDagFileProcessorAgent(unittest.TestCase): def setUp(self):