Re: [PR] Add DAG importer abstraction layer [airflow]
kaxil merged PR #60127: URL: https://github.com/apache/airflow/pull/60127 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Add DAG importer abstraction layer [airflow]
gopidesupavan commented on code in PR #60127: URL: https://github.com/apache/airflow/pull/60127#discussion_r2686556213 ## airflow-core/src/airflow/dag_processing/importers/python_importer.py: ## @@ -0,0 +1,360 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Python DAG importer - imports DAGs from Python files.""" + +from __future__ import annotations + +import contextlib +import importlib +import importlib.machinery +import importlib.util +import logging +import os +import signal +import sys +import traceback +import warnings +import zipfile +from collections.abc import Iterator +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow import settings +from airflow._shared.module_loading.file_discovery import find_path_from_directory +from airflow.configuration import conf +from airflow.dag_processing.importers.base import ( +AbstractDagImporter, +DagImportError, +DagImportResult, +DagImportWarning, +) +from airflow.utils.docs import get_docs_url +from airflow.utils.file import get_unique_dag_module_name, might_contain_dag + +if TYPE_CHECKING: +from types import ModuleType + +from airflow.sdk import DAG + +log = logging.getLogger(__name__) + + [email protected] +def _timeout(seconds: float = 1, error_message: str = "Timeout"): +"""Context manager for timing out operations.""" +error_message = error_message + ", PID: " + str(os.getpid()) + +def handle_timeout(signum, frame): +log.error("Process timed out, PID: %s", str(os.getpid())) +from airflow.sdk.exceptions import AirflowTaskTimeout + +raise AirflowTaskTimeout(error_message) + +try: +try: +signal.signal(signal.SIGALRM, handle_timeout) +signal.setitimer(signal.ITIMER_REAL, seconds) +except ValueError: +log.warning("timeout can't be used in the current context", exc_info=True) +yield +finally: +with contextlib.suppress(ValueError): +signal.setitimer(signal.ITIMER_REAL, 0) + + +class PythonDagImporter(AbstractDagImporter): +""" +Importer for Python DAG files and zip archives containing Python DAGs. + +This is the default importer registered with the DagImporterRegistry. It handles: +- .py files: Standard Python DAG files +- .zip files: ZIP archives containing Python DAG files + +Note: The .zip extension is exclusively owned by this importer. If you need to +support other file formats inside ZIP archives (e.g., YAML), you would need to +either extend this importer or create a composite importer that delegates based +on the contents of the archive. +""" + +@classmethod +def supported_extensions(cls) -> list[str]: +"""Return file extensions handled by this importer (.py and .zip).""" +return [".py", ".zip"] + +def list_dag_files( +self, +directory: str | os.PathLike[str], +safe_mode: bool = True, +) -> Iterator[str]: +""" +List Python DAG files in a directory. + +Handles both .py files and .zip archives containing Python DAGs. +Respects .airflowignore files in the directory tree. +""" +ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob") + +for file_path in find_path_from_directory(directory, ".airflowignore", ignore_file_syntax): +path = Path(file_path) +try: +if path.is_file() and (path.suffix.lower() == ".py" or zipfile.is_zipfile(path)): +if might_contain_dag(file_path, safe_mode): +yield file_path +except Exception: +log.exception("Error while examining %s", file_path) + +def import_file( +self, +file_path: str | Path, +*, +bundle_path: Path | None = None, +bundle_name: str | None = None, +safe_mode: bool = True, +) -> DagImportResult: +""" +Import DAGs from a Python file or zip archive. + +:param file_path: Path to the Python file to import. +:param bundle_path: Path to the bundle root. +:param bundle_name
Re: [PR] Add DAG importer abstraction layer [airflow]
kaxil commented on code in PR #60127: URL: https://github.com/apache/airflow/pull/60127#discussion_r2668985254 ## airflow-core/src/airflow/dag_processing/importers/python_importer.py: ## @@ -0,0 +1,360 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Python DAG importer - imports DAGs from Python files.""" + +from __future__ import annotations + +import contextlib +import importlib +import importlib.machinery +import importlib.util +import logging +import os +import signal +import sys +import traceback +import warnings +import zipfile +from collections.abc import Iterator +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow import settings +from airflow._shared.module_loading.file_discovery import find_path_from_directory +from airflow.configuration import conf +from airflow.dag_processing.importers.base import ( +AbstractDagImporter, +DagImportError, +DagImportResult, +DagImportWarning, +) +from airflow.utils.docs import get_docs_url +from airflow.utils.file import get_unique_dag_module_name, might_contain_dag + +if TYPE_CHECKING: +from types import ModuleType + +from airflow.sdk import DAG + +log = logging.getLogger(__name__) + + [email protected] +def _timeout(seconds: float = 1, error_message: str = "Timeout"): +"""Context manager for timing out operations.""" +error_message = error_message + ", PID: " + str(os.getpid()) + +def handle_timeout(signum, frame): +log.error("Process timed out, PID: %s", str(os.getpid())) +from airflow.sdk.exceptions import AirflowTaskTimeout + +raise AirflowTaskTimeout(error_message) + +try: +try: +signal.signal(signal.SIGALRM, handle_timeout) +signal.setitimer(signal.ITIMER_REAL, seconds) +except ValueError: +log.warning("timeout can't be used in the current context", exc_info=True) +yield +finally: +with contextlib.suppress(ValueError): +signal.setitimer(signal.ITIMER_REAL, 0) + + +class PythonDagImporter(AbstractDagImporter): +""" +Importer for Python DAG files and zip archives containing Python DAGs. + +This is the default importer registered with the DagImporterRegistry. It handles: +- .py files: Standard Python DAG files +- .zip files: ZIP archives containing Python DAG files + +Note: The .zip extension is exclusively owned by this importer. If you need to +support other file formats inside ZIP archives (e.g., YAML), you would need to +either extend this importer or create a composite importer that delegates based +on the contents of the archive. +""" + +@classmethod +def supported_extensions(cls) -> list[str]: +"""Return file extensions handled by this importer (.py and .zip).""" +return [".py", ".zip"] + +def list_dag_files( +self, +directory: str | os.PathLike[str], +safe_mode: bool = True, +) -> Iterator[str]: +""" +List Python DAG files in a directory. + +Handles both .py files and .zip archives containing Python DAGs. +Respects .airflowignore files in the directory tree. +""" +ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob") + +for file_path in find_path_from_directory(directory, ".airflowignore", ignore_file_syntax): +path = Path(file_path) +try: +if path.is_file() and (path.suffix.lower() == ".py" or zipfile.is_zipfile(path)): +if might_contain_dag(file_path, safe_mode): +yield file_path +except Exception: +log.exception("Error while examining %s", file_path) + +def import_file( +self, +file_path: str | Path, +*, +bundle_path: Path | None = None, +bundle_name: str | None = None, +safe_mode: bool = True, +) -> DagImportResult: +""" +Import DAGs from a Python file or zip archive. + +:param file_path: Path to the Python file to import. +:param bundle_path: Path to the bundle root. +:param bundle_name: Name o
Re: [PR] Add DAG importer abstraction layer [airflow]
kaxil commented on code in PR #60127: URL: https://github.com/apache/airflow/pull/60127#discussion_r2668980105 ## airflow-core/src/airflow/dag_processing/importers/python_importer.py: ## @@ -0,0 +1,360 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Python DAG importer - imports DAGs from Python files.""" + +from __future__ import annotations + +import contextlib +import importlib +import importlib.machinery +import importlib.util +import logging +import os +import signal +import sys +import traceback +import warnings +import zipfile +from collections.abc import Iterator +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow import settings +from airflow._shared.module_loading.file_discovery import find_path_from_directory +from airflow.configuration import conf +from airflow.dag_processing.importers.base import ( +AbstractDagImporter, +DagImportError, +DagImportResult, +DagImportWarning, +) +from airflow.utils.docs import get_docs_url +from airflow.utils.file import get_unique_dag_module_name, might_contain_dag + +if TYPE_CHECKING: +from types import ModuleType + +from airflow.sdk import DAG + +log = logging.getLogger(__name__) + + [email protected] +def _timeout(seconds: float = 1, error_message: str = "Timeout"): +"""Context manager for timing out operations.""" +error_message = error_message + ", PID: " + str(os.getpid()) + +def handle_timeout(signum, frame): +log.error("Process timed out, PID: %s", str(os.getpid())) +from airflow.sdk.exceptions import AirflowTaskTimeout + +raise AirflowTaskTimeout(error_message) + +try: +try: +signal.signal(signal.SIGALRM, handle_timeout) +signal.setitimer(signal.ITIMER_REAL, seconds) +except ValueError: +log.warning("timeout can't be used in the current context", exc_info=True) +yield +finally: +with contextlib.suppress(ValueError): +signal.setitimer(signal.ITIMER_REAL, 0) + + +class PythonDagImporter(AbstractDagImporter): +""" +Importer for Python DAG files and zip archives containing Python DAGs. + +This is the default importer registered with the DagImporterRegistry. It handles: +- .py files: Standard Python DAG files +- .zip files: ZIP archives containing Python DAG files + +Note: The .zip extension is exclusively owned by this importer. If you need to +support other file formats inside ZIP archives (e.g., YAML), you would need to +either extend this importer or create a composite importer that delegates based +on the contents of the archive. +""" + +@classmethod +def supported_extensions(cls) -> list[str]: +"""Return file extensions handled by this importer (.py and .zip).""" +return [".py", ".zip"] + +def list_dag_files( +self, +directory: str | os.PathLike[str], +safe_mode: bool = True, +) -> Iterator[str]: +""" +List Python DAG files in a directory. + +Handles both .py files and .zip archives containing Python DAGs. +Respects .airflowignore files in the directory tree. +""" +ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob") + +for file_path in find_path_from_directory(directory, ".airflowignore", ignore_file_syntax): +path = Path(file_path) +try: +if path.is_file() and (path.suffix.lower() == ".py" or zipfile.is_zipfile(path)): +if might_contain_dag(file_path, safe_mode): +yield file_path +except Exception: +log.exception("Error while examining %s", file_path) + +def import_file( +self, +file_path: str | Path, +*, +bundle_path: Path | None = None, +bundle_name: str | None = None, +safe_mode: bool = True, +) -> DagImportResult: +""" +Import DAGs from a Python file or zip archive. + +:param file_path: Path to the Python file to import. +:param bundle_path: Path to the bundle root. +:param bundle_name: Name o
Re: [PR] Add DAG importer abstraction layer [airflow]
kaxil commented on code in PR #60127: URL: https://github.com/apache/airflow/pull/60127#discussion_r2668974857 ## airflow-core/tests/unit/dag_processing/test_dagbag.py: ## @@ -452,7 +441,7 @@ def test_process_file_that_contains_multi_bytes_char(self, tmp_path): """ test that we're able to parse file that contains multi-byte char """ -path = tmp_path / "testfile" +path = tmp_path / "testfile.py" Review Comment: >Maybe those files should be assumed to be python files and parsed with python importer? Not really, those were test issues -- not something we have supported officially -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Add DAG importer abstraction layer [airflow]
kaxil commented on code in PR #60127: URL: https://github.com/apache/airflow/pull/60127#discussion_r2668979074 ## airflow-core/src/airflow/dag_processing/importers/python_importer.py: ## @@ -0,0 +1,360 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Python DAG importer - imports DAGs from Python files.""" + +from __future__ import annotations + +import contextlib +import importlib +import importlib.machinery +import importlib.util +import logging +import os +import signal +import sys +import traceback +import warnings +import zipfile +from collections.abc import Iterator +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow import settings +from airflow._shared.module_loading.file_discovery import find_path_from_directory +from airflow.configuration import conf +from airflow.dag_processing.importers.base import ( +AbstractDagImporter, +DagImportError, +DagImportResult, +DagImportWarning, +) +from airflow.utils.docs import get_docs_url +from airflow.utils.file import get_unique_dag_module_name, might_contain_dag + +if TYPE_CHECKING: +from types import ModuleType + +from airflow.sdk import DAG + +log = logging.getLogger(__name__) + + [email protected] +def _timeout(seconds: float = 1, error_message: str = "Timeout"): +"""Context manager for timing out operations.""" +error_message = error_message + ", PID: " + str(os.getpid()) + +def handle_timeout(signum, frame): +log.error("Process timed out, PID: %s", str(os.getpid())) +from airflow.sdk.exceptions import AirflowTaskTimeout + +raise AirflowTaskTimeout(error_message) + +try: +try: +signal.signal(signal.SIGALRM, handle_timeout) +signal.setitimer(signal.ITIMER_REAL, seconds) +except ValueError: +log.warning("timeout can't be used in the current context", exc_info=True) +yield +finally: +with contextlib.suppress(ValueError): +signal.setitimer(signal.ITIMER_REAL, 0) + + +class PythonDagImporter(AbstractDagImporter): +""" +Importer for Python DAG files and zip archives containing Python DAGs. + +This is the default importer registered with the DagImporterRegistry. It handles: +- .py files: Standard Python DAG files +- .zip files: ZIP archives containing Python DAG files + +Note: The .zip extension is exclusively owned by this importer. If you need to +support other file formats inside ZIP archives (e.g., YAML), you would need to +either extend this importer or create a composite importer that delegates based +on the contents of the archive. +""" + +@classmethod +def supported_extensions(cls) -> list[str]: +"""Return file extensions handled by this importer (.py and .zip).""" +return [".py", ".zip"] + +def list_dag_files( +self, +directory: str | os.PathLike[str], +safe_mode: bool = True, +) -> Iterator[str]: +""" +List Python DAG files in a directory. + +Handles both .py files and .zip archives containing Python DAGs. +Respects .airflowignore files in the directory tree. +""" +ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob") + +for file_path in find_path_from_directory(directory, ".airflowignore", ignore_file_syntax): +path = Path(file_path) +try: +if path.is_file() and (path.suffix.lower() == ".py" or zipfile.is_zipfile(path)): +if might_contain_dag(file_path, safe_mode): +yield file_path +except Exception: +log.exception("Error while examining %s", file_path) + +def import_file( +self, +file_path: str | Path, +*, +bundle_path: Path | None = None, +bundle_name: str | None = None, +safe_mode: bool = True, +) -> DagImportResult: +""" +Import DAGs from a Python file or zip archive. + +:param file_path: Path to the Python file to import. +:param bundle_path: Path to the bundle root. +:param bundle_name: Name o
Re: [PR] Add DAG importer abstraction layer [airflow]
ephraimbuddy commented on code in PR #60127: URL: https://github.com/apache/airflow/pull/60127#discussion_r2668795193 ## airflow-core/tests/unit/dag_processing/test_dagbag.py: ## @@ -452,7 +441,7 @@ def test_process_file_that_contains_multi_bytes_char(self, tmp_path): """ test that we're able to parse file that contains multi-byte char """ -path = tmp_path / "testfile" +path = tmp_path / "testfile.py" Review Comment: Does this mean that files without explicit py extension won't be parsed? Maybe those files should be assumed to be python files and parsed with python importer? ## airflow-core/src/airflow/dag_processing/importers/python_importer.py: ## @@ -0,0 +1,360 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Python DAG importer - imports DAGs from Python files.""" + +from __future__ import annotations + +import contextlib +import importlib +import importlib.machinery +import importlib.util +import logging +import os +import signal +import sys +import traceback +import warnings +import zipfile +from collections.abc import Iterator +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow import settings +from airflow._shared.module_loading.file_discovery import find_path_from_directory +from airflow.configuration import conf +from airflow.dag_processing.importers.base import ( +AbstractDagImporter, +DagImportError, +DagImportResult, +DagImportWarning, +) +from airflow.utils.docs import get_docs_url +from airflow.utils.file import get_unique_dag_module_name, might_contain_dag + +if TYPE_CHECKING: +from types import ModuleType + +from airflow.sdk import DAG + +log = logging.getLogger(__name__) + + [email protected] +def _timeout(seconds: float = 1, error_message: str = "Timeout"): +"""Context manager for timing out operations.""" +error_message = error_message + ", PID: " + str(os.getpid()) + +def handle_timeout(signum, frame): +log.error("Process timed out, PID: %s", str(os.getpid())) +from airflow.sdk.exceptions import AirflowTaskTimeout + +raise AirflowTaskTimeout(error_message) + +try: +try: +signal.signal(signal.SIGALRM, handle_timeout) +signal.setitimer(signal.ITIMER_REAL, seconds) +except ValueError: +log.warning("timeout can't be used in the current context", exc_info=True) +yield +finally: +with contextlib.suppress(ValueError): +signal.setitimer(signal.ITIMER_REAL, 0) + + +class PythonDagImporter(AbstractDagImporter): +""" +Importer for Python DAG files and zip archives containing Python DAGs. + +This is the default importer registered with the DagImporterRegistry. It handles: +- .py files: Standard Python DAG files +- .zip files: ZIP archives containing Python DAG files + +Note: The .zip extension is exclusively owned by this importer. If you need to +support other file formats inside ZIP archives (e.g., YAML), you would need to +either extend this importer or create a composite importer that delegates based +on the contents of the archive. +""" + +@classmethod +def supported_extensions(cls) -> list[str]: +"""Return file extensions handled by this importer (.py and .zip).""" +return [".py", ".zip"] + +def list_dag_files( +self, +directory: str | os.PathLike[str], +safe_mode: bool = True, +) -> Iterator[str]: +""" +List Python DAG files in a directory. + +Handles both .py files and .zip archives containing Python DAGs. +Respects .airflowignore files in the directory tree. +""" +ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob") + +for file_path in find_path_from_directory(directory, ".airflowignore", ignore_file_syntax): +path = Path(file_path) +try: +if path.is_file() and (path.suffix.lower() == ".py" or zipfile.is_zipfile(path)): +if might_contain_dag(file_path, safe_mode): +yield file_path +
Re: [PR] Add DAG importer abstraction layer [airflow]
ikholopov-omni commented on code in PR #60127:
URL: https://github.com/apache/airflow/pull/60127#discussion_r2662599016
##
airflow-core/src/airflow/dag_processing/importers/base.py:
##
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Abstract base class for DAG importers."""
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+from airflow.sdk import DAG
+
+
+@dataclass
+class DagImportError:
+"""Structured error information for DAG import failures."""
+
+file_path: str
+message: str
+error_type: str = "import"
+line_number: int | None = None
+column_number: int | None = None
+context: str | None = None
+suggestion: str | None = None
+stacktrace: str | None = None
+
+def format_message(self) -> str:
+"""Format the error as a human-readable string."""
+parts = [f"Error in {self.file_path}"]
+if self.line_number is not None:
+loc = f"line {self.line_number}"
+if self.column_number is not None:
+loc += f", column {self.column_number}"
+parts.append(f"Location: {loc}")
+parts.append(f"Error ({self.error_type}): {self.message}")
+if self.context:
+parts.append(f"Context:\n{self.context}")
+if self.suggestion:
+parts.append(f"Suggestion: {self.suggestion}")
+return "\n".join(parts)
+
+
+@dataclass
+class DagImportWarning:
+"""Warning information for non-fatal issues during DAG import."""
+
+file_path: str
+message: str
+warning_type: str = "general"
+line_number: int | None = None
+
+
+@dataclass
+class DagImportResult:
+"""Result of importing DAGs from a file."""
+
+file_path: str
+dags: list[DAG] = field(default_factory=list)
+errors: list[DagImportError] = field(default_factory=list)
+skipped_files: list[str] = field(default_factory=list)
+warnings: list[DagImportWarning] = field(default_factory=list)
+
+@property
+def success(self) -> bool:
+"""Return True if no fatal errors occurred."""
+return len(self.errors) == 0
+
+
+class AbstractDagImporter(ABC):
+"""Abstract base class for DAG importers."""
+
+@classmethod
+@abstractmethod
+def supported_extensions(cls) -> list[str]:
+"""Return file extensions this importer handles (e.g., ['.py',
'.zip'])."""
+
+@abstractmethod
+def import_file(
+self,
+file_path: str | Path,
+*,
+bundle_path: Path | None = None,
+bundle_name: str | None = None,
+safe_mode: bool = True,
+) -> DagImportResult:
+"""Import DAGs from a file."""
+
+def can_handle(self, file_path: str | Path) -> bool:
+"""Check if this importer can handle the given file."""
+path = Path(file_path) if isinstance(file_path, str) else file_path
+return path.suffix.lower() in self.supported_extensions()
+
+def get_relative_path(self, file_path: str | Path, bundle_path: Path |
None) -> str:
Review Comment:
Just confirming the intent, looks good as-is
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Add DAG importer abstraction layer [airflow]
Copilot commented on code in PR #60127:
URL: https://github.com/apache/airflow/pull/60127#discussion_r2662429101
##
airflow-core/tests/unit/dag_processing/test_dagbag.py:
##
@@ -842,41 +831,27 @@ def test_timeout_dag_errors_are_import_errors(self,
tmp_path, caplog):
"""
Test that if the DAG contains Timeout error it will be still loaded to
DB as import_errors
"""
-code_to_save = """
-# Define Dag to load
+dag_file = tmp_path / "timeout_dag.py"
+dag_file.write_text("""
import datetime
import time
import airflow
from airflow.providers.standard.operators.python import PythonOperator
-time.sleep(1)
+time.sleep(1) # This will cause timeout during import
Review Comment:
Comment incorrectly states this will cause timeout during import, but the
actual timeout is configured as 0.01 seconds (line 851) while the sleep is 1
second. The comment should reflect that this sleep exceeds the configured
timeout threshold.
```suggestion
time.sleep(1) # This sleep exceeds the configured DAGBAG_IMPORT_TIMEOUT
during import
```
##
airflow-core/src/airflow/dag_processing/importers/base.py:
##
@@ -0,0 +1,266 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Abstract base class for DAG importers."""
+
+from __future__ import annotations
+
+import logging
+import os
+import threading
+from abc import ABC, abstractmethod
+from collections.abc import Iterator
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+from airflow.sdk import DAG
+
+log = logging.getLogger(__name__)
+
+
+@dataclass
+class DagImportError:
+"""Structured error information for DAG import failures."""
+
+file_path: str
+message: str
+error_type: str = "import"
+line_number: int | None = None
+column_number: int | None = None
+context: str | None = None
+suggestion: str | None = None
+stacktrace: str | None = None
+
+def format_message(self) -> str:
+"""Format the error as a human-readable string."""
+parts = [f"Error in {self.file_path}"]
+if self.line_number is not None:
+loc = f"line {self.line_number}"
+if self.column_number is not None:
+loc += f", column {self.column_number}"
+parts.append(f"Location: {loc}")
+parts.append(f"Error ({self.error_type}): {self.message}")
+if self.context:
+parts.append(f"Context:\n{self.context}")
+if self.suggestion:
+parts.append(f"Suggestion: {self.suggestion}")
+return "\n".join(parts)
+
+
+@dataclass
+class DagImportWarning:
+"""Warning information for non-fatal issues during DAG import."""
+
+file_path: str
+message: str
+warning_type: str = "general"
+line_number: int | None = None
+
+
+@dataclass
+class DagImportResult:
+"""Result of importing DAGs from a file."""
+
+file_path: str
+dags: list[DAG] = field(default_factory=list)
+errors: list[DagImportError] = field(default_factory=list)
+skipped_files: list[str] = field(default_factory=list)
+warnings: list[DagImportWarning] = field(default_factory=list)
+
+@property
+def success(self) -> bool:
+"""Return True if no fatal errors occurred."""
+return len(self.errors) == 0
+
+
+class AbstractDagImporter(ABC):
+"""Abstract base class for DAG importers."""
+
+@classmethod
+@abstractmethod
+def supported_extensions(cls) -> list[str]:
+"""Return file extensions this importer handles (e.g., ['.py',
'.zip'])."""
+
+@abstractmethod
+def import_file(
+self,
+file_path: str | Path,
+*,
+bundle_path: Path | None = None,
+bundle_name: str | None = None,
+safe_mode: bool = True,
+) -> DagImportResult:
+"""Import DAGs from a file."""
+
+def can_handle(self, file_path: str | Path) -> bool:
+"""Check if this importer can handle the given file."""
+path = Path(file_path) if isinstance(file_path, str) else file_path
+return path.suffix.lower() in self.supported_extensions()
+
+def get_relative_pat
Re: [PR] Add DAG importer abstraction layer [airflow]
kaxil commented on code in PR #60127:
URL: https://github.com/apache/airflow/pull/60127#discussion_r2662423612
##
airflow-core/src/airflow/dag_processing/importers/base.py:
##
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Abstract base class for DAG importers."""
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+from airflow.sdk import DAG
+
+
+@dataclass
+class DagImportError:
+"""Structured error information for DAG import failures."""
+
+file_path: str
+message: str
+error_type: str = "import"
+line_number: int | None = None
+column_number: int | None = None
+context: str | None = None
+suggestion: str | None = None
+stacktrace: str | None = None
+
+def format_message(self) -> str:
+"""Format the error as a human-readable string."""
+parts = [f"Error in {self.file_path}"]
+if self.line_number is not None:
+loc = f"line {self.line_number}"
+if self.column_number is not None:
+loc += f", column {self.column_number}"
+parts.append(f"Location: {loc}")
+parts.append(f"Error ({self.error_type}): {self.message}")
+if self.context:
+parts.append(f"Context:\n{self.context}")
+if self.suggestion:
+parts.append(f"Suggestion: {self.suggestion}")
+return "\n".join(parts)
+
+
+@dataclass
+class DagImportWarning:
+"""Warning information for non-fatal issues during DAG import."""
+
+file_path: str
+message: str
+warning_type: str = "general"
+line_number: int | None = None
+
+
+@dataclass
+class DagImportResult:
+"""Result of importing DAGs from a file."""
+
+file_path: str
+dags: list[DAG] = field(default_factory=list)
+errors: list[DagImportError] = field(default_factory=list)
+skipped_files: list[str] = field(default_factory=list)
+warnings: list[DagImportWarning] = field(default_factory=list)
+
+@property
+def success(self) -> bool:
+"""Return True if no fatal errors occurred."""
+return len(self.errors) == 0
+
+
+class AbstractDagImporter(ABC):
+"""Abstract base class for DAG importers."""
+
+@classmethod
+@abstractmethod
+def supported_extensions(cls) -> list[str]:
+"""Return file extensions this importer handles (e.g., ['.py',
'.zip'])."""
+
+@abstractmethod
+def import_file(
+self,
+file_path: str | Path,
+*,
+bundle_path: Path | None = None,
+bundle_name: str | None = None,
+safe_mode: bool = True,
+) -> DagImportResult:
+"""Import DAGs from a file."""
+
+def can_handle(self, file_path: str | Path) -> bool:
+"""Check if this importer can handle the given file."""
+path = Path(file_path) if isinstance(file_path, str) else file_path
+return path.suffix.lower() in self.supported_extensions()
+
+def get_relative_path(self, file_path: str | Path, bundle_path: Path |
None) -> str:
+"""Get the relative file path from the bundle root."""
+if bundle_path is None:
+return str(file_path)
+try:
+return str(Path(file_path).relative_to(bundle_path))
+except ValueError:
+return str(file_path)
+
+
+class DagImporterRegistry:
+"""Registry for DAG importers. Singleton that manages importers by file
extension."""
+
+_instance: DagImporterRegistry | None = None
+_importers: dict[str, AbstractDagImporter]
+
+def __new__(cls) -> DagImporterRegistry:
+if cls._instance is None:
+cls._instance = super().__new__(cls)
+cls._instance._importers = {}
+cls._instance._register_default_importers()
+return cls._instance
+
+def _register_default_importers(self) -> None:
+from airflow.dag_processing.importers.python_importer import
PythonDagImporter
+
+self.register(PythonDagImporter())
+
+def register(self, importer: AbstractDagImporter) -> None:
+"""Register an importer for its suppo
Re: [PR] Add DAG importer abstraction layer [airflow]
kaxil commented on code in PR #60127:
URL: https://github.com/apache/airflow/pull/60127#discussion_r2662421646
##
airflow-core/src/airflow/dag_processing/importers/base.py:
##
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Abstract base class for DAG importers."""
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+from airflow.sdk import DAG
+
+
+@dataclass
+class DagImportError:
+"""Structured error information for DAG import failures."""
+
+file_path: str
+message: str
+error_type: str = "import"
+line_number: int | None = None
+column_number: int | None = None
+context: str | None = None
+suggestion: str | None = None
+stacktrace: str | None = None
+
+def format_message(self) -> str:
+"""Format the error as a human-readable string."""
+parts = [f"Error in {self.file_path}"]
+if self.line_number is not None:
+loc = f"line {self.line_number}"
+if self.column_number is not None:
+loc += f", column {self.column_number}"
+parts.append(f"Location: {loc}")
+parts.append(f"Error ({self.error_type}): {self.message}")
+if self.context:
+parts.append(f"Context:\n{self.context}")
+if self.suggestion:
+parts.append(f"Suggestion: {self.suggestion}")
+return "\n".join(parts)
+
+
+@dataclass
+class DagImportWarning:
+"""Warning information for non-fatal issues during DAG import."""
+
+file_path: str
+message: str
+warning_type: str = "general"
+line_number: int | None = None
+
+
+@dataclass
+class DagImportResult:
+"""Result of importing DAGs from a file."""
+
+file_path: str
+dags: list[DAG] = field(default_factory=list)
+errors: list[DagImportError] = field(default_factory=list)
+skipped_files: list[str] = field(default_factory=list)
+warnings: list[DagImportWarning] = field(default_factory=list)
+
+@property
+def success(self) -> bool:
+"""Return True if no fatal errors occurred."""
+return len(self.errors) == 0
+
+
+class AbstractDagImporter(ABC):
+"""Abstract base class for DAG importers."""
+
+@classmethod
+@abstractmethod
+def supported_extensions(cls) -> list[str]:
+"""Return file extensions this importer handles (e.g., ['.py',
'.zip'])."""
+
+@abstractmethod
+def import_file(
+self,
+file_path: str | Path,
+*,
+bundle_path: Path | None = None,
+bundle_name: str | None = None,
+safe_mode: bool = True,
+) -> DagImportResult:
+"""Import DAGs from a file."""
+
+def can_handle(self, file_path: str | Path) -> bool:
+"""Check if this importer can handle the given file."""
+path = Path(file_path) if isinstance(file_path, str) else file_path
+return path.suffix.lower() in self.supported_extensions()
+
+def get_relative_path(self, file_path: str | Path, bundle_path: Path |
None) -> str:
+"""Get the relative file path from the bundle root."""
+if bundle_path is None:
+return str(file_path)
+try:
+return str(Path(file_path).relative_to(bundle_path))
+except ValueError:
+return str(file_path)
+
+
+class DagImporterRegistry:
+"""Registry for DAG importers. Singleton that manages importers by file
extension."""
+
+_instance: DagImporterRegistry | None = None
+_importers: dict[str, AbstractDagImporter]
+
+def __new__(cls) -> DagImporterRegistry:
+if cls._instance is None:
+cls._instance = super().__new__(cls)
+cls._instance._importers = {}
+cls._instance._register_default_importers()
+return cls._instance
+
+def _register_default_importers(self) -> None:
+from airflow.dag_processing.importers.python_importer import
PythonDagImporter
+
+self.register(PythonDagImporter())
+
+def register(self, importer: AbstractDagImporter) -> None:
+"""Register an importer for its suppo
Re: [PR] Add DAG importer abstraction layer [airflow]
kaxil commented on code in PR #60127:
URL: https://github.com/apache/airflow/pull/60127#discussion_r2662417533
##
airflow-core/src/airflow/dag_processing/importers/base.py:
##
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Abstract base class for DAG importers."""
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+from airflow.sdk import DAG
+
+
+@dataclass
+class DagImportError:
+"""Structured error information for DAG import failures."""
+
+file_path: str
+message: str
+error_type: str = "import"
+line_number: int | None = None
+column_number: int | None = None
+context: str | None = None
+suggestion: str | None = None
+stacktrace: str | None = None
+
+def format_message(self) -> str:
+"""Format the error as a human-readable string."""
+parts = [f"Error in {self.file_path}"]
+if self.line_number is not None:
+loc = f"line {self.line_number}"
+if self.column_number is not None:
+loc += f", column {self.column_number}"
+parts.append(f"Location: {loc}")
+parts.append(f"Error ({self.error_type}): {self.message}")
+if self.context:
+parts.append(f"Context:\n{self.context}")
+if self.suggestion:
+parts.append(f"Suggestion: {self.suggestion}")
+return "\n".join(parts)
+
+
+@dataclass
+class DagImportWarning:
+"""Warning information for non-fatal issues during DAG import."""
+
+file_path: str
+message: str
+warning_type: str = "general"
+line_number: int | None = None
+
+
+@dataclass
+class DagImportResult:
+"""Result of importing DAGs from a file."""
+
+file_path: str
+dags: list[DAG] = field(default_factory=list)
+errors: list[DagImportError] = field(default_factory=list)
+skipped_files: list[str] = field(default_factory=list)
+warnings: list[DagImportWarning] = field(default_factory=list)
+
+@property
+def success(self) -> bool:
+"""Return True if no fatal errors occurred."""
+return len(self.errors) == 0
+
+
+class AbstractDagImporter(ABC):
+"""Abstract base class for DAG importers."""
+
+@classmethod
+@abstractmethod
+def supported_extensions(cls) -> list[str]:
+"""Return file extensions this importer handles (e.g., ['.py',
'.zip'])."""
+
+@abstractmethod
+def import_file(
+self,
+file_path: str | Path,
+*,
+bundle_path: Path | None = None,
+bundle_name: str | None = None,
+safe_mode: bool = True,
+) -> DagImportResult:
+"""Import DAGs from a file."""
+
+def can_handle(self, file_path: str | Path) -> bool:
+"""Check if this importer can handle the given file."""
+path = Path(file_path) if isinstance(file_path, str) else file_path
+return path.suffix.lower() in self.supported_extensions()
+
+def get_relative_path(self, file_path: str | Path, bundle_path: Path |
None) -> str:
Review Comment:
You are right that it could be a standalone function. I kept it as a method
for two reasons:
1. Extensibility: Some importers might need custom path resolution (e.g.,
nested archives, virtual file systems)
2. Encapsulation: Keeps the path-related logic close to the importer that
uses it
That said, happy to extract it to a utility function if you feel strongly
about it - the default implementation is generic enough
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Add DAG importer abstraction layer [airflow]
ikholopov-omni commented on code in PR #60127:
URL: https://github.com/apache/airflow/pull/60127#discussion_r2662154326
##
airflow-core/src/airflow/dag_processing/importers/base.py:
##
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Abstract base class for DAG importers."""
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+from airflow.sdk import DAG
+
+
+@dataclass
+class DagImportError:
+"""Structured error information for DAG import failures."""
+
+file_path: str
+message: str
+error_type: str = "import"
+line_number: int | None = None
+column_number: int | None = None
+context: str | None = None
+suggestion: str | None = None
+stacktrace: str | None = None
+
+def format_message(self) -> str:
+"""Format the error as a human-readable string."""
+parts = [f"Error in {self.file_path}"]
+if self.line_number is not None:
+loc = f"line {self.line_number}"
+if self.column_number is not None:
+loc += f", column {self.column_number}"
+parts.append(f"Location: {loc}")
+parts.append(f"Error ({self.error_type}): {self.message}")
+if self.context:
+parts.append(f"Context:\n{self.context}")
+if self.suggestion:
+parts.append(f"Suggestion: {self.suggestion}")
+return "\n".join(parts)
+
+
+@dataclass
+class DagImportWarning:
+"""Warning information for non-fatal issues during DAG import."""
+
+file_path: str
+message: str
+warning_type: str = "general"
+line_number: int | None = None
+
+
+@dataclass
+class DagImportResult:
+"""Result of importing DAGs from a file."""
+
+file_path: str
+dags: list[DAG] = field(default_factory=list)
+errors: list[DagImportError] = field(default_factory=list)
+skipped_files: list[str] = field(default_factory=list)
+warnings: list[DagImportWarning] = field(default_factory=list)
+
+@property
+def success(self) -> bool:
+"""Return True if no fatal errors occurred."""
+return len(self.errors) == 0
+
+
+class AbstractDagImporter(ABC):
+"""Abstract base class for DAG importers."""
+
+@classmethod
+@abstractmethod
+def supported_extensions(cls) -> list[str]:
+"""Return file extensions this importer handles (e.g., ['.py',
'.zip'])."""
+
+@abstractmethod
+def import_file(
+self,
+file_path: str | Path,
+*,
+bundle_path: Path | None = None,
+bundle_name: str | None = None,
+safe_mode: bool = True,
+) -> DagImportResult:
+"""Import DAGs from a file."""
+
+def can_handle(self, file_path: str | Path) -> bool:
+"""Check if this importer can handle the given file."""
+path = Path(file_path) if isinstance(file_path, str) else file_path
+return path.suffix.lower() in self.supported_extensions()
+
+def get_relative_path(self, file_path: str | Path, bundle_path: Path |
None) -> str:
Review Comment:
Is the goal of this method is to allow importer to override
file_path/bundle_path relation resolution? If not, it can probably be an
independent function.
##
airflow-core/src/airflow/dag_processing/importers/base.py:
##
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitati
Re: [PR] Add DAG importer abstraction layer [airflow]
Copilot commented on code in PR #60127:
URL: https://github.com/apache/airflow/pull/60127#discussion_r2661971525
##
airflow-core/src/airflow/dag_processing/dagbag.py:
##
@@ -339,29 +331,62 @@ def process_file(self, filepath, only_if_updated=True,
safe_mode=True):
self.log.exception(e)
return []
-# Ensure we don't pick up anything else we didn't mean to
-DagContext.autoregistered_dags.clear()
-
self.captured_warnings.pop(filepath, None)
-with _capture_with_reraise() as captured_warnings:
-if filepath.endswith(".py") or not zipfile.is_zipfile(filepath):
-mods = self._load_modules_from_file(filepath, safe_mode)
-else:
-mods = self._load_modules_from_zip(filepath, safe_mode)
-if captured_warnings:
-formatted_warnings = []
-for msg in captured_warnings:
-category = msg.category.__name__
-if (module := msg.category.__module__) != "builtins":
-category = f"{module}.{category}"
-formatted_warnings.append(f"{msg.filename}:{msg.lineno}:
{category}: {msg.message}")
+from airflow.dag_processing.importers import get_importer_registry
+
+registry = get_importer_registry()
+importer = registry.get_importer(filepath)
+
+if importer is None:
+self.log.debug("No importer found for file: %s", filepath)
+return []
+
+result = importer.import_file(
+file_path=filepath,
+bundle_path=Path(self.dag_folder) if self.dag_folder else None,
+bundle_name=self.bundle_name,
+safe_mode=safe_mode,
+)
+
+if result.skipped_files:
+for skipped in result.skipped_files:
+if not self.has_logged:
+self.has_logged = True
+self.log.info("File %s assumed to contain no DAGs.
Skipping.", skipped)
+
+if result.errors:
+for error in result.errors:
+# Use the file path from error for ZIP files (contains
zip/file.py format)
+# For regular files, use the original filepath
+if zipfile.is_zipfile(filepath):
+error_path = error.file_path if error.file_path else
filepath
+else:
+error_path = filepath
+error_msg = error.stacktrace if error.stacktrace else
error.message
+self.import_errors[error_path] = error_msg
+self.log.error("Error loading DAG from %s: %s", error_path,
error.message)
+
+if result.warnings:
+formatted_warnings = [
+f"{w.file_path}:{w.line_number}: {w.warning_type}:
{w.message}" for w in result.warnings
+]
self.captured_warnings[filepath] = tuple(formatted_warnings)
-found_dags = self._process_modules(filepath, mods,
file_last_changed_on_disk)
+bagged_dags = []
+for dag in result.dags:
+try:
+# Only set fileloc if not already set by importer (ZIP files
have path inside archive)
+if not dag.fileloc:
Review Comment:
The comment on line 379 states 'Only set fileloc if not already set by
importer (ZIP files have path inside archive)', but this logic may not
correctly handle all cases. If a DAG's fileloc is explicitly set to an empty
string or falsy value by the importer, this condition would incorrectly
override it. Consider checking for None specifically or clarifying the expected
behavior.
```suggestion
if dag.fileloc is None:
```
##
airflow-core/src/airflow/dag_processing/importers/python_importer.py:
##
@@ -0,0 +1,430 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Python DAG importer - imports DAGs from Python files."""
+
+from __future__ import annotations
+
+import contextlib
+import importlib
+import importlib.machinery
+import importlib.util
+import logging
+import os
+import signal
+import sys
+import traceback
+import warnings
+import zipfile
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
Re: [PR] Add DAG importer abstraction layer [airflow]
kaxil commented on PR #60127: URL: https://github.com/apache/airflow/pull/60127#issuecomment-3710986592 cc @IKholopov -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
