[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r353160974 ## File path: UPDATING.md ## @@ -41,6 +41,30 @@ assists users migrating to a new version. ## Airflow Master +### Changes to airflow package imports + +Importing `executor`, `hooks`, `macros`, `operators`, `sensors` packages no +longer works indirectly via airflow package. For example this will not work: + +```python +import airflow + +operator = airflow.operators.BashOperator(...) Review comment: `airflow.operators.BashOperator` was relying on the "load all operators from submodule and stick it on operators module" behavoiur, and was already warned with the `Removed deprecated import mechanism` section in the UPDATING, which shows the same import paths you've got here. So we might not need this here (though that could be updated to say it affects the other packages too) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r353159009 ## File path: tests/operators/test_google_api_to_s3_transfer.py ## @@ -29,7 +30,7 @@ class TestGoogleApiToS3Transfer(unittest.TestCase): def setUp(self): -configuration.load_test_config() +load_test_config() Review comment: Remove here too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r353158679 ## File path: tests/gcp/hooks/test_google_discovery_api.py ## @@ -20,15 +20,16 @@ import unittest from unittest.mock import call, patch -from airflow import configuration, models +from airflow import models +from airflow.configuration import load_test_config from airflow.gcp.hooks.discovery_api import GoogleDiscoveryApiHook from airflow.utils import db class TestGoogleDiscoveryApiHook(unittest.TestCase): def setUp(self): -configuration.load_test_config() +load_test_config() Review comment: I thought I got rid of all of these. This isn't needed any longer and cna just be removed entirely. ```suggestion ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r353156998 ## File path: airflow/utils/dag_processing.py ## @@ -724,26 +530,27 @@ class DagFileProcessorManager(LoggingMixin): """ def __init__(self, - dag_directory, - file_paths, - max_runs, - processor_factory, - processor_timeout, - signal_conn, - async_mode=True): -self._file_paths = file_paths -self._file_path_queue = [] -self._dag_directory = dag_directory -self._max_runs = max_runs -self._processor_factory = processor_factory -self._signal_conn = signal_conn -self._async_mode = async_mode + dag_directory: str, + file_paths: List[str], + max_runs: int, + processor_factory: Callable[[str, List[Any]], AbstractDagFileProcessor], + processor_timeout: timedelta, + signal_conn: Connection, + async_mode: bool = True): +self._file_paths: List[str] = file_paths +self._file_path_queue: List[str] = [] +self._dag_directory: str = dag_directory +self._max_runs: int = max_runs +self._processor_factory: Callable[[str, List[Any]], AbstractDagFileProcessor] = processor_factory +self._signal_conn: Connection = signal_conn +self._async_mode: bool = async_mode Review comment: Style point that w should decide upon: In cases like this where th function arg is typed we don't also need to specify the type on the instance attribute - mypy will do that automatically. So this could be written as: ``` def __init__(self: dag_directory: str, file_paths: List[str], max_runs: int, processor_factory: Callable[[str, List[Any]], AbstractDagFileProcessor], processor_timeout: timedelta, signal_conn: Connection, async_mode: bool = True): self._file_paths file_paths self._file_path_queue: List[str] = [] self._dag_directory = dag_directory self._max_runs = max_runs self._processor_factory = processor_factory self._signal_conn = signal_conn self._async_mode = async_mode ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r353154607 ## File path: airflow/executors/dask_executor.py ## @@ -16,36 +15,37 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Dask executor.""" import subprocess -import warnings +from typing import Any, Dict, Optional -import distributed +from distributed import Client, Future, as_completed +from distributed.security import Security from airflow.configuration import conf -from airflow.executors.base_executor import BaseExecutor +from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, CommandType +from airflow.models.taskinstance import TaskInstanceKeyType class DaskExecutor(BaseExecutor): """ DaskExecutor submits tasks to a Dask Distributed cluster. """ def __init__(self, cluster_address=None): +super().__init__(parallelism=0) if cluster_address is None: cluster_address = conf.get('dask', 'cluster_address') -if not cluster_address: -raise ValueError( -'Please provide a Dask cluster address in airflow.cfg') +assert cluster_address, 'Please provide a Dask cluster address in airflow.cfg' self.cluster_address = cluster_address # ssl / tls parameters self.tls_ca = conf.get('dask', 'tls_ca') self.tls_key = conf.get('dask', 'tls_key') self.tls_cert = conf.get('dask', 'tls_cert') -super().__init__(parallelism=0) +self.client: Optional[Client] = None +self.futures: Optional[Dict[Optional[Future], TaskInstanceKeyType]] = None Review comment: ```suggestion self.futures: Optional[Dict[Future, TaskInstanceKeyType]] = None ``` Having Optional key type seems odd? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r353154297 ## File path: airflow/executors/celery_executor.py ## @@ -264,30 +284,44 @@ def sync(self): ) continue key, state = key_and_state -try: -if self.last_state[key] != state: -if state == celery_states.SUCCESS: -self.success(key) -del self.tasks[key] -del self.last_state[key] -elif state == celery_states.FAILURE: -self.fail(key) -del self.tasks[key] -del self.last_state[key] -elif state == celery_states.REVOKED: -self.fail(key) -del self.tasks[key] -del self.last_state[key] -else: -self.log.info("Unexpected state: %s", state) -self.last_state[key] = state -except Exception: -self.log.exception("Error syncing the Celery executor, ignoring it.") - -def end(self, synchronous=False): +self.update_task_state(key, state) + +def update_task_state(self, key: TaskInstanceKeyType, state: str) -> None: +"""Updates state of a single task.""" +# noinspection PyBroadException +try: +if self.last_state[key] != state: +if state == celery_states.SUCCESS: +self.success(key) +del self.tasks[key] +del self.last_state[key] +elif state == celery_states.FAILURE: +self.fail(key) +del self.tasks[key] +del self.last_state[key] +elif state == celery_states.REVOKED: +self.fail(key) +del self.tasks[key] +del self.last_state[key] +else: +self.log.info("Unexpected state: %s", state) +self.last_state[key] = state +except Exception: # pylint: disable=broad-except +self.log.exception("Error syncing the Celery executor, ignoring it.") + +def end(self, synchronous: bool = False) -> None: if synchronous: -while any([ -task.state not in celery_states.READY_STATES -for task in self.tasks.values()]): +while any([task.state not in celery_states.READY_STATES for task in self.tasks.values()]): time.sleep(5) self.sync() + +def execute_async(self, + key: TaskInstanceKeyType, + command: CommandType, + queue: Optional[str] = None, + executor_config: Optional[Any] = None): +"""Do not allow async execution for Celery executor.""" +raise AirflowException("No Async execution for Celery executor.") + +def terminate(self): +"""Terminate the executor is not doing anything.""" Review comment: ```suggestion pass ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r353152675 ## File path: UPDATING.md ## @@ -41,6 +41,30 @@ assists users migrating to a new version. ## Airflow Master +### Changes to airflow package imports + +Importing `executor`, `hooks`, `macros`, `operators`, `sensors` packages no +longer works indirectly via airflow package. For example this will not work: + +```python +import airflow + +operator = airflow.operators.BashOperator(...) Review comment: Hasn't this alredy been deprecated/ I guess this did actuall owkr but issued a warning before this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r353151215 ## File path: airflow/executors/dask_executor.py ## @@ -66,20 +63,17 @@ def execute_async(self, command: CommandType, queue: Optional[str] = None, executor_config: Optional[Any] = None) -> None: -if not self.futures: -raise AirflowException("Executor should be started first.") +assert self.futures, NOT_STARTED_MESSAGE Review comment: I think this is a good case for asserts. Under normal circumstances they should pass, and it's only if you're developing really that you might get this wrong, so having this be asserts that could be disabled by `python -O` seems like the right thing to do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352568182 ## File path: airflow/settings.py ## @@ -68,13 +67,13 @@ LOG_FORMAT = conf.get('core', 'log_format') SIMPLE_LOG_FORMAT = conf.get('core', 'simple_log_format') -SQL_ALCHEMY_CONN = None # type: Optional[str] -DAGS_FOLDER = None # type: Optional[str] -PLUGINS_FOLDER = None # type: Optional[str] -LOGGING_CLASS_PATH = None # type: Optional[str] +SQL_ALCHEMY_CONN: Optional[str] = None Review comment: Unrelated changes in this file now? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352567794 ## File path: airflow/plugins_manager.py ## @@ -16,17 +15,18 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - -import imp +"""Manages all plugins.""" +# noinspection PyDeprecation +import imp # pylint: disable=deprecated-module import inspect import os import re -from typing import Any, List +import sys +from typing import Any, Callable, List, Optional import pkg_resources from airflow import settings -from airflow.models.baseoperator import BaseOperatorLink Review comment: We're removing this one and the tighter type integartion because otherwise this pulls in all of airflow.models which we want to avoid? Is this worth using an `if TYPING:` for? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352567425 ## File path: airflow/plugins_manager.py ## @@ -187,26 +192,29 @@ def make_module(name, objects): macros_modules = [] # Plugin components to integrate directly -admin_views = [] # type: List[Any] -flask_blueprints = [] # type: List[Any] -menu_links = [] # type: List[Any] -flask_appbuilder_views = [] # type: List[Any] -flask_appbuilder_menu_links = [] # type: List[Any] -stat_name_handler = None # type: Any -global_operator_extra_links = [] # type: List[BaseOperatorLink] -operator_extra_links = [] # type: List[BaseOperatorLink] +admin_views: List[Any] = [] +flask_blueprints: List[Any] = [] +menu_links: List[Any] = [] +flask_appbuilder_views: List[Any] = [] +flask_appbuilder_menu_links: List[Any] = [] +stat_name_handler: Any = None Review comment: ```suggestion stat_name_handler: Optional[Callable[[str], str]] = None ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352566346 ## File path: airflow/models/dagbag.py ## @@ -406,8 +404,10 @@ def collect_dags( FileLoadStat = namedtuple( 'FileLoadStat', "file duration dag_num task_num dags") +from airflow.utils.file import correct_maybe_zipped dag_folder = correct_maybe_zipped(dag_folder) +from airflow.utils.file import list_py_file_paths Review comment: Small nit: we could combine the two imports in to a single line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352565842 ## File path: airflow/models/dag.py ## @@ -1254,9 +1253,11 @@ def run( """ from airflow.jobs import BackfillJob if not executor and local: +from airflow.executors.local_executor import LocalExecutor executor = LocalExecutor() elif not executor: -executor = get_default_executor() +from airflow.executors.executor_loader import ExecutorLoader Review comment: (Not for this PR, but I do wonder if we should remove this functionality as it feels somewhat out of place with the "normal" way that `airflow tasks run` is used. It's almost closer to `airflow tasks submit` in nature. But it also bypasses lots of the scheduler "gating" that maybe it shouldn't.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352565100 ## File path: airflow/logging_config.py ## @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- Review comment: Could you undo this change as it's the only one in the file please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352563810 ## File path: airflow/executors/local_executor.py ## @@ -43,40 +42,44 @@ This option could lead to the unification of the executor implementations, running locally, into just one `LocalExecutor` with multiple modes. """ - -import multiprocessing import subprocess -from queue import Empty - -from airflow.executors.base_executor import BaseExecutor +from multiprocessing import Manager, Process +from multiprocessing.managers import SyncManager +from queue import Empty, Queue # pylint: disable=unused-import # noqa: F401 +from typing import Any, List, Optional, Tuple, Union # pylint: disable=unused-import # noqa: F401 + +from airflow import AirflowException +from airflow.executors.base_executor import NOT_STARTED_MESSAGE, PARALLELISM, BaseExecutor, CommandType +from airflow.models.taskinstance import ( # pylint: disable=unused-import # noqa: F401 +TaskInstanceKeyType, TaskInstanceStateType, +) from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State +# This is a work to be executed by a worker. +# It can Key and Command - but it can also be None, None which is actually a +# "Poison Pill" - worker seeing Poison Pill should take the pill and ... die instantly. +ExecutorWorkTodo = Tuple[Optional[TaskInstanceKeyType], Optional[CommandType]] Review comment: ```suggestion ExecutorWorkType = Tuple[Optional[TaskInstanceKeyType], Optional[CommandType]] ``` I think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352563506 ## File path: airflow/executors/local_executor.py ## @@ -43,40 +42,44 @@ This option could lead to the unification of the executor implementations, running locally, into just one `LocalExecutor` with multiple modes. """ - -import multiprocessing import subprocess -from queue import Empty - -from airflow.executors.base_executor import BaseExecutor +from multiprocessing import Manager, Process +from multiprocessing.managers import SyncManager +from queue import Empty, Queue # pylint: disable=unused-import # noqa: F401 +from typing import Any, List, Optional, Tuple, Union # pylint: disable=unused-import # noqa: F401 + +from airflow import AirflowException Review comment: Did we not say that inside the core this should be imported from `airflow.exceptions` directly? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352563040 ## File path: airflow/executors/kubernetes_executor.py ## @@ -601,18 +629,19 @@ def terminate(self): class KubernetesExecutor(BaseExecutor, LoggingMixin): """Executor for Kubernetes""" + def __init__(self): -self.kube_config = KubeConfig() -self.task_queue = None -self.result_queue = None -self.kube_scheduler = None -self.kube_client = None -self.worker_uuid = None +self.kube_config: KubeConfig = KubeConfig() Review comment: ```suggestion self.kube_config = KubeConfig() ``` don't need this one (but do for the rest of the ones in this fn) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352562023 ## File path: airflow/executors/kubernetes_executor.py ## @@ -241,17 +253,23 @@ def _validate(self): class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): """Watches for Kubernetes jobs""" -def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config): +def __init__(self, + namespace: str, + watcher_queue: 'Queue[KubernetesWatchType]', + resource_version: str, + worker_uuid: Optional[str], + kube_config: Configuration): multiprocessing.Process.__init__(self) -self.namespace = namespace -self.worker_uuid = worker_uuid -self.watcher_queue = watcher_queue -self.resource_version = resource_version -self.kube_config = kube_config +self.namespace: str = namespace Review comment: https://mypy.readthedocs.io/en/stable/class_basics.html#instance-and-class-attributes says we don't need to do this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352561195 ## File path: airflow/executors/dask_executor.py ## @@ -55,23 +58,28 @@ def start(self): else: security = None -self.client = distributed.Client(self.cluster_address, security=security) +self.client = Client(self.cluster_address, security=security) self.futures = {} -def execute_async(self, key, command, queue=None, executor_config=None): -if queue is not None: -warnings.warn( -'DaskExecutor does not support queues. ' -'All tasks will be run in the same cluster' -) +def execute_async(self, + key: TaskInstanceKeyType, + command: CommandType, + queue: Optional[str] = None, + executor_config: Optional[Any] = None) -> None: +if not self.futures: +raise AirflowException("Executor should be started first.") def airflow_run(): return subprocess.check_call(command, close_fds=True) +if not self.client: +raise AirflowException("The Dask executor has not been started yet!") future = self.client.submit(airflow_run, pure=False) self.futures[future] = key -def _process_future(self, future): +def _process_future(self, future: Future) -> None: +if not self.futures: Review comment: There's mixed styles in this file -- some us `if not self.x` some do `assert self.x` -- we should be consistent This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352499486 ## File path: airflow/executors/celery_executor.py ## @@ -264,30 +284,45 @@ def sync(self): ) continue key, state = key_and_state -try: -if self.last_state[key] != state: -if state == celery_states.SUCCESS: -self.success(key) -del self.tasks[key] -del self.last_state[key] -elif state == celery_states.FAILURE: -self.fail(key) -del self.tasks[key] -del self.last_state[key] -elif state == celery_states.REVOKED: -self.fail(key) -del self.tasks[key] -del self.last_state[key] -else: -self.log.info("Unexpected state: %s", state) -self.last_state[key] = state -except Exception: -self.log.exception("Error syncing the Celery executor, ignoring it.") - -def end(self, synchronous=False): +self.update_task_state(key, state) + +# noinspection PyUnreachableCode Review comment: What does it think is unreachable in here? 🤔 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352498858 ## File path: airflow/executors/celery_executor.py ## @@ -57,17 +61,17 @@ @app.task -def execute_command(command_to_exec): +def task(command_to_exec: str): +"""Executes command.""" log = LoggingMixin().log log.info("Executing command in Celery: %s", command_to_exec) env = os.environ.copy() try: subprocess.check_call(command_to_exec, stderr=subprocess.STDOUT, close_fds=True, env=env) except subprocess.CalledProcessError as e: -log.exception('execute_command encountered a CalledProcessError') +log.exception('execute command encountered a CalledProcessError') Review comment: ^^ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352498412 ## File path: airflow/executors/base_executor.py ## @@ -16,67 +14,86 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +""" +Base executor - this is the base class for all the implemented executors. +""" from collections import OrderedDict +from typing import Any, Dict, List, Optional, Set, Tuple, Union -# To avoid circular imports -import airflow.utils.dag_processing -from airflow.configuration import conf Review comment: I would have thought that we want this import, not `from airflow import conf`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352497006 ## File path: airflow/__init__.py ## @@ -44,23 +44,8 @@ settings.initialize() -login = None # type: Optional[Callable] +from airflow.plugins_manager import integrate_plugins Review comment: Oh, cos importing plugins mananger needs settings to have been loaded. Right. _somewhat_ off scope, so this can probably stay as it is for now, but we could move all of the top level code in airflow.plugins_manager in to this new `integrate_plugins` method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352495420 ## File path: airflow/__init__.py ## @@ -44,23 +44,8 @@ settings.initialize() -login = None # type: Optional[Callable] +from airflow.plugins_manager import integrate_plugins Review comment: Any reason not to have this import at the top with the normal block? Because of L31 we've diabled "wrong-import-position" for the entire fil This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352494761 ## File path: airflow/__init__.py ## @@ -44,23 +44,8 @@ settings.initialize() -login = None # type: Optional[Callable] +from airflow.plugins_manager import integrate_plugins -from airflow import executors -from airflow import hooks -from airflow import macros -from airflow import operators -from airflow import sensors +login: Optional[Callable] = None - -class AirflowMacroPlugin: -# pylint: disable=missing-docstring -def __init__(self, namespace): -self.namespace = namespace - - -operators._integrate_plugins() # pylint: disable=protected-access -sensors._integrate_plugins() # pylint: disable=protected-access -hooks._integrate_plugins() # pylint: disable=protected-access -executors._integrate_plugins() # pylint: disable=protected-access -macros._integrate_plugins() # pylint: disable=protected-access +integrate_plugins() Review comment: I guess the main difference here is this no longer works: ``` import airflow; airflow.operators.BashOperator ``` However given that is _already_ deprecated, and `import airflow; airflow.operators.bash_operator.BashOperator` doesn't work on master I _think_ this is a safe change. Though probably needs an explicit mention in UPDATING.md (if it's not there already) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352492591 ## File path: airflow/__init__.py ## @@ -44,23 +44,8 @@ settings.initialize() -login = None # type: Optional[Callable] +from airflow.plugins_manager import integrate_plugins -from airflow import executors -from airflow import hooks -from airflow import macros -from airflow import operators -from airflow import sensors +login: Optional[Callable] = None - -class AirflowMacroPlugin: Review comment: I think this is probably fine -- this class hasn't done anything since mid-2015, so I can't see why someone might be using it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352491482 ## File path: airflow/__init__.py ## @@ -44,23 +44,8 @@ settings.initialize() -login = None # type: Optional[Callable] +from airflow.plugins_manager import integrate_plugins -from airflow import executors -from airflow import hooks -from airflow import macros -from airflow import operators -from airflow import sensors +login: Optional[Callable] = None - -class AirflowMacroPlugin: Review comment: Hmmm might be worth mentioning? It's not something we mention in our docs anywhere but...? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299444 ## File path: airflow/executors/kubernetes_executor.py ## @@ -703,10 +735,12 @@ def _create_or_update_secret(secret_name, secret_path): for service_account in name_path_pair_list: _create_or_update_secret(service_account['name'], service_account['path']) -def start(self): +def start(self) -> None: """Starts the executor""" self.log.info('Start Kubernetes executor') self.worker_uuid = KubeWorkerIdentifier.get_or_create_current_kube_worker_uuid() +if not self.worker_uuid: +raise AirflowException("Could not get worker_uuid") Review comment: Shouldn't this exception be inside `get_or_create_current_kube_worker_uuid` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299553 ## File path: airflow/executors/local_executor.py ## @@ -43,40 +42,44 @@ This option could lead to the unification of the executor implementations, running locally, into just one `LocalExecutor` with multiple modes. """ - -import multiprocessing import subprocess -from queue import Empty - -from airflow.executors.base_executor import BaseExecutor +from multiprocessing import Manager, Process +from multiprocessing.managers import SyncManager +from queue import Empty, Queue # pylint: disable=unused-import # noqa: F401 +from typing import Any, List, Optional, Tuple, Union # pylint: disable=unused-import # noqa: F401 + +from airflow import AirflowException +from airflow.executors.base_executor import NOT_STARTED_MESSAGE, PARALLELISM, BaseExecutor, CommandType +from airflow.models.taskinstance import ( # pylint: disable=unused-import # noqa: F401 +TaskInstanceKey, TaskInstanceState, +) from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State +# This is a work to be executed by a worker. +# It can Key and Command - but it can also be None, None which is actually a +# "Poison Pill" - worker seeing Poison Pill should take the pill and ... die instantly. +ExecutorWorkTodo = Tuple[Optional[TaskInstanceKey], Optional[CommandType]] -class LocalWorker(multiprocessing.Process, LoggingMixin): -"""LocalWorker Process implementation to run airflow commands. Executes the given -command and puts the result into a result queue when done, terminating execution.""" +class LocalWorkerBase(Process, LoggingMixin): +""" +LocalWorker Process implementation to run airflow commands. Executes the given Review comment: Doesn't match class name This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299790 ## File path: airflow/kubernetes/pod_generator.py ## @@ -26,35 +26,7 @@ import kubernetes.client.models as k8s -from airflow.executors import Executors - - -class PodDefaults: -""" -Static defaults for the PodGenerator -""" -XCOM_MOUNT_PATH = '/airflow/xcom' -SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar' -XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 30; done;' -VOLUME_MOUNT = k8s.V1VolumeMount( -name='xcom', -mount_path=XCOM_MOUNT_PATH -) -VOLUME = k8s.V1Volume( -name='xcom', -empty_dir=k8s.V1EmptyDirVolumeSource() -) -SIDECAR_CONTAINER = k8s.V1Container( -name=SIDECAR_CONTAINER_NAME, -command=['sh', '-c', XCOM_CMD], -image='alpine', -volume_mounts=[VOLUME_MOUNT], -resources=k8s.V1ResourceRequirements( -requests={ -"cpu": "1m", -} -), -) +from airflow.kubernetes.pod_defaults import PodDefaults Review comment: Why did we have to move this one out? Given that this module has almost no imports it seems like we shouldn't have to touch it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299109 ## File path: airflow/executors/base_executor.py ## @@ -16,67 +14,82 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Base executor.""" from collections import OrderedDict +from queue import Queue +from typing import Any, Dict, Optional, Set, Tuple -# To avoid circular imports -import airflow.utils.dag_processing -from airflow.configuration import conf +from airflow import AirflowException, LoggingMixin, conf +from airflow.executors.executor_type_aliases import CommandType, ExecutorKeyType +from airflow.models import TaskInstance from airflow.stats import Stats -from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.dag_processing import SimpleTaskInstance from airflow.utils.state import State -PARALLELISM = conf.getint('core', 'PARALLELISM') +PARALLELISM: int = conf.getint('core', 'PARALLELISM') class BaseExecutor(LoggingMixin): - -def __init__(self, parallelism=PARALLELISM): -""" -Class to derive in order to interface with executor-type systems -like Celery, Yarn and the likes. - -:param parallelism: how many jobs should run at one time. Set to -``0`` for infinity -:type parallelism: int -""" -self.parallelism = parallelism -self.queued_tasks = OrderedDict() -self.running = {} -self.event_buffer = {} +""" +Class to derive in order to interface with executor-type systems +like Celery, Yarn and the likes. + +:param parallelism: how many jobs should run at one time. Set to +``0`` for infinity +:type parallelism: int +""" + +def __init__(self, parallelism: int = PARALLELISM): +super().__init__() +self.parallelism: int = parallelism +self.queued_tasks: OrderedDict[ +ExecutorKeyType, +Tuple[CommandType, int, Queue, SimpleTaskInstance]] \ += OrderedDict() +self.running: Set[ExecutorKeyType] = set() +self.event_buffer: Dict[ExecutorKeyType, str] = {} def start(self): # pragma: no cover """ -Executors may need to get things started. For example LocalExecutor -starts N workers. +Executors may need to get things started. """ -def queue_command(self, simple_task_instance, command, priority=1, queue=None): +def queue_command(self, + simple_task_instance: SimpleTaskInstance, + command_to_run: CommandType, + priority: int = 1, + queue: Optional[Queue] = None): +"""Queues command to task""" key = simple_task_instance.key -if key not in self.queued_tasks and key not in self.running: -self.log.info("Adding to queue: %s", command) -self.queued_tasks[key] = (command, priority, queue, simple_task_instance) +if key not in self.queued_tasks.keys() and key not in self.running: Review comment: Is this going to make it slower -- i.e. does this genrate a list of keys and then do a list search for the key vs just a direct hash lookup? What was the error? This feels like a bug in mypy rather than a bug in our code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299235 ## File path: airflow/executors/base_executor.py ## @@ -16,67 +14,82 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Base executor.""" from collections import OrderedDict +from queue import Queue +from typing import Any, Dict, Optional, Set, Tuple -# To avoid circular imports -import airflow.utils.dag_processing -from airflow.configuration import conf +from airflow import AirflowException, LoggingMixin, conf +from airflow.executors.executor_type_aliases import CommandType, ExecutorKeyType +from airflow.models import TaskInstance from airflow.stats import Stats -from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.dag_processing import SimpleTaskInstance from airflow.utils.state import State -PARALLELISM = conf.getint('core', 'PARALLELISM') +PARALLELISM: int = conf.getint('core', 'PARALLELISM') class BaseExecutor(LoggingMixin): - -def __init__(self, parallelism=PARALLELISM): -""" -Class to derive in order to interface with executor-type systems -like Celery, Yarn and the likes. - -:param parallelism: how many jobs should run at one time. Set to -``0`` for infinity -:type parallelism: int -""" -self.parallelism = parallelism -self.queued_tasks = OrderedDict() -self.running = {} -self.event_buffer = {} +""" +Class to derive in order to interface with executor-type systems +like Celery, Yarn and the likes. + +:param parallelism: how many jobs should run at one time. Set to +``0`` for infinity +:type parallelism: int +""" + +def __init__(self, parallelism: int = PARALLELISM): +super().__init__() +self.parallelism: int = parallelism +self.queued_tasks: OrderedDict[ +ExecutorKeyType, +Tuple[CommandType, int, Queue, SimpleTaskInstance]] \ += OrderedDict() +self.running: Set[ExecutorKeyType] = set() +self.event_buffer: Dict[ExecutorKeyType, str] = {} def start(self): # pragma: no cover """ -Executors may need to get things started. For example LocalExecutor -starts N workers. +Executors may need to get things started. """ -def queue_command(self, simple_task_instance, command, priority=1, queue=None): +def queue_command(self, + simple_task_instance: SimpleTaskInstance, + command_to_run: CommandType, + priority: int = 1, + queue: Optional[Queue] = None): +"""Queues command to task""" key = simple_task_instance.key -if key not in self.queued_tasks and key not in self.running: -self.log.info("Adding to queue: %s", command) -self.queued_tasks[key] = (command, priority, queue, simple_task_instance) +if key not in self.queued_tasks.keys() and key not in self.running: Review comment: Though why does it complain here but not in `has_task`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352300164 ## File path: docs/plugins.rst ## @@ -167,7 +167,7 @@ definitions in Airflow. from airflow.models.baseoperator import BaseOperatorLink from airflow.operators.gcs_to_s3 import GoogleCloudStorageToS3Operator from airflow.sensors.base_sensor_operator import BaseSensorOperator -from airflow.executors.base_executor import BaseExecutor +from airflow.executors.base_executors import BaseExecutor Review comment: This counds as a breaking change - someone has probably created a custom Executor out there. Is it possible not not change this module name please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299475 ## File path: airflow/executors/kubernetes_executor.py ## @@ -715,6 +749,10 @@ def start(self): KubeResourceVersion.reset_resource_version() self.task_queue = self._manager.Queue() self.result_queue = self._manager.Queue() +if not self.task_queue: +raise AirflowException("Could not start task_queue") Review comment: Is this actually possible to hit this line or is it just to make mypy happy and any excpetions would actually come from `_manager.Queue()` directly? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299255 ## File path: airflow/executors/base_executor.py ## @@ -132,47 +151,64 @@ def heartbeat(self): self.log.debug("Calling the %s sync method", self.__class__) self.sync() -def trigger_tasks(self, open_slots): +def trigger_tasks(self, open_slots: int) -> None: """ -Trigger tasks +Triggers tasks :param open_slots: Number of open slots -:return: """ sorted_queue = sorted( [(k, v) for k, v in self.queued_tasks.items()], key=lambda x: x[1][1], reverse=True) for _ in range(min((open_slots, len(self.queued_tasks: -key, (command, _, queue, simple_ti) = sorted_queue.pop(0) +key, (command, _, _, simple_ti) = sorted_queue.pop(0) self.queued_tasks.pop(key) -self.running[key] = command +self.running.add(key) self.execute_async(key=key, command=command, - queue=queue, Review comment: This feels like an API change to the executor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299639 ## File path: airflow/executors/local_executor.py ## @@ -201,31 +273,50 @@ def sync(self): break def end(self): -# Sending poison pill to all worker +"""Ends the executor. Sends the poison pill to all workers.""" for _ in self.executor.workers: self.queue.put((None, None)) # Wait for commands to finish self.queue.join() self.executor.sync() -def start(self): -self.manager = multiprocessing.Manager() +def start(self) -> None: +"""Starts the executor""" +self.manager = Manager() self.result_queue = self.manager.Queue() self.workers = [] self.workers_used = 0 self.workers_active = 0 -self.impl = (LocalExecutor._UnlimitedParallelism(self) if self.parallelism == 0 - else LocalExecutor._LimitedParallelism(self)) +self.impl = (LocalExecutor.UnlimitedParallelism(self) if self.parallelism == 0 + else LocalExecutor.LimitedParallelism(self)) self.impl.start() -def execute_async(self, key, command, queue=None, executor_config=None): -self.impl.execute_async(key=key, command=command) +def execute_async(self, key: TaskInstanceKey, + command: CommandType, + executor_config: Optional[Any] = None) -> None: +"""Execute asynchronously.""" +if not self.impl: +raise AirflowException(NOT_STARTED_MESSAGE) +self.impl.execute_async(key=key, command=command, executor_config=executor_config) -def sync(self): +def sync(self) -> None: +""" +Sync will get called periodically by the heartbeat method. +""" +if not self.impl: +raise AirflowException(NOT_STARTED_MESSAGE) self.impl.sync() -def end(self): +def end(self) -> None: +""" +Ends the executor. +:return: +""" +if not self.impl: +raise AirflowException("Executor should be started first") self.impl.end() +if not self.manager: +raise AirflowException("Executor should be started first") Review comment: Should use NOT_STARTED_MESSAGE here (or the custom exception subclass) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352298971 ## File path: airflow/executors/all_executors.py ## @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""All executors.""" +from typing import Optional + +from airflow import AirflowException, LoggingMixin, conf +from airflow.executors.base_executor import BaseExecutor + + +class AllExecutors: Review comment: ```suggestion class Loader: ``` (and rename to loader.py) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299315 ## File path: airflow/executors/base_executor.py ## @@ -185,25 +221,22 @@ def get_event_buffer(self, dag_ids=None): return cleared_events def execute_async(self, - key, - command, - queue=None, - executor_config=None): # pragma: no cover + key: TaskInstanceKey, + command: CommandType, + executor_config: Optional[Any] = None) -> None: # pragma: no cover """ This method will execute the command asynchronously. + +:param key: Unique key for the task instance +:param command: Command to run +:param executor_config: Configuration passed to the executor. """ raise NotImplementedError() -def end(self): # pragma: no cover +def end(self) -> None: # pragma: no cover """ This method is called when the caller is done submitting job and wants to wait synchronously for the job submitted previously to be all done. """ raise NotImplementedError() - -def terminate(self): Review comment: Also feels like an API change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299527 ## File path: airflow/executors/kubernetes_executor.py ## @@ -820,8 +877,14 @@ def _flush_result_queue(self): except Empty: break -def end(self): +def end(self) -> None: """Called when the executor shuts down""" +if not self.task_queue: +raise AirflowException(NOT_STARTED_MESSAGE) +if not self.result_queue: +raise AirflowException(NOT_STARTED_MESSAGE) Review comment: All these feel like it should be a custom subclass instead This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299335 ## File path: airflow/executors/celery_executor.py ## @@ -57,17 +61,17 @@ @app.task -def execute_command(command_to_exec): +def task(command_to_exec: str): +"""Executes command.""" log = LoggingMixin().log log.info("Executing command in Celery: %s", command_to_exec) env = os.environ.copy() try: subprocess.check_call(command_to_exec, stderr=subprocess.STDOUT, close_fds=True, env=env) except subprocess.CalledProcessError as e: -log.exception('execute_command encountered a CalledProcessError') +log.exception('execute command encountered a CalledProcessError') Review comment: ```suggestion log.exception('execute_command encountered a CalledProcessError') ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299726 ## File path: airflow/jobs/scheduler_job.py ## @@ -35,21 +35,24 @@ from sqlalchemy import and_, func, not_, or_ from sqlalchemy.orm.session import make_transient -from airflow import executors, models, settings +from airflow import models, settings from airflow.configuration import conf from airflow.exceptions import AirflowException +from airflow.executors.local_executor import LocalExecutor +from airflow.executors.sequential_executor import SequentialExecutor from airflow.jobs.base_job import BaseJob from airflow.models import DAG, DagRun, SlaMiss, errors +from airflow.models.taskinstance import SimpleTaskInstance from airflow.stats import Stats from airflow.ti_deps.dep_context import SCHEDULEABLE_STATES, SCHEDULED_DEPS, DepContext from airflow.ti_deps.deps.pool_slots_available_dep import STATES_TO_COUNT_AS_RUNNING from airflow.utils import asciiart, helpers, timezone from airflow.utils.dag_processing import ( -AbstractDagFileProcessor, DagFileProcessorAgent, SimpleDag, SimpleDagBag, SimpleTaskInstance, -list_py_file_paths, +AbstractDagFileProcessor, DagFileProcessorAgent, SimpleDag, SimpleDagBag, Review comment: Why did SimpleTI get moved but SimpleDag didn't? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352300093 ## File path: airflow/utils/dag_processing.py ## @@ -1394,3 +1196,8 @@ def emit_metrics(self): # TODO: Remove before Airflow 2.0 Stats.gauge('collect_dags', parse_time) Stats.gauge('dagbag_import_errors', sum(stat.import_errors for stat in self._file_stats.values())) + +# pylint: disable=missing-docstring +@property +def file_paths(self): +return self._file_paths Review comment: API change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352299897 ## File path: airflow/models/dagbag.py ## @@ -317,9 +317,7 @@ def kill_zombies(self, zombies, session=None): had a heartbeat for too long, in the current DagBag. :param zombies: zombie task instances to kill. -:type zombies: airflow.utils.dag_processing.SimpleTaskInstance Review comment: It sounds like we should add this back, it's for docs not the type syste anyway. The way to avoid imported modules/classes becoming part of the public API of this module is to use `__all__` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r351277914 ## File path: airflow/utils/dag_processing.py ## @@ -63,204 +62,109 @@ class SimpleDag(BaseDag): :type pickle_id: unicode """ -def __init__(self, dag, pickle_id=None): -self._dag_id = dag.dag_id -self._task_ids = [task.task_id for task in dag.tasks] -self._full_filepath = dag.full_filepath -self._is_paused = dag.is_paused -self._concurrency = dag.concurrency -self._pickle_id = pickle_id -self._task_special_args = {} +def __init__(self, dag, pickle_id: Optional[str] = None): +self._dag_id: str = dag.dag_id +self._task_ids: List[str] = [task.task_id for task in dag.tasks] +self._full_filepath: str = dag.full_filepath +self._is_paused: bool = dag.is_paused +self._concurrency: int = dag.concurrency +self._pickle_id: Optional[str] = pickle_id +self._task_special_args: Dict[str, Any] = {} for task in dag.tasks: special_args = {} if task.task_concurrency is not None: special_args['task_concurrency'] = task.task_concurrency -if len(special_args) > 0: +if special_args: self._task_special_args[task.task_id] = special_args @property -def dag_id(self): +def dag_id(self) -> str: """ :return: the DAG ID :rtype: unicode """ return self._dag_id @property -def task_ids(self): +def task_ids(self) -> List[str]: """ :return: A list of task IDs that are in this DAG :rtype: list[unicode] """ return self._task_ids @property -def full_filepath(self): +def full_filepath(self) -> str: """ :return: The absolute path to the file that contains this DAG's definition :rtype: unicode """ return self._full_filepath @property -def concurrency(self): +def concurrency(self) -> int: """ :return: maximum number of tasks that can run simultaneously from this DAG :rtype: int """ return self._concurrency @property -def is_paused(self): +def is_paused(self) -> bool: """ :return: whether this DAG is paused or not :rtype: bool """ return self._is_paused @property -def pickle_id(self): +def pickle_id(self) -> Optional[str]: """ :return: The pickle ID for this DAG, if it has one. Otherwise None. :rtype: unicode """ return self._pickle_id @property -def task_special_args(self): +def task_special_args(self) -> Dict[str, Any]: +"""Special arguments of the task.""" return self._task_special_args -def get_task_special_arg(self, task_id, special_arg_name): +def get_task_special_arg(self, task_id: str, special_arg_name: str): +"""Retrieve special arguments of the task.""" if task_id in self._task_special_args and special_arg_name in self._task_special_args[task_id]: return self._task_special_args[task_id][special_arg_name] else: return None -class SimpleTaskInstance: -def __init__(self, ti): -self._dag_id = ti.dag_id -self._task_id = ti.task_id -self._execution_date = ti.execution_date -self._start_date = ti.start_date -self._end_date = ti.end_date -self._try_number = ti.try_number -self._state = ti.state -self._executor_config = ti.executor_config -if hasattr(ti, 'run_as_user'): -self._run_as_user = ti.run_as_user -else: -self._run_as_user = None -if hasattr(ti, 'pool'): -self._pool = ti.pool -else: -self._pool = None -if hasattr(ti, 'priority_weight'): -self._priority_weight = ti.priority_weight -else: -self._priority_weight = None -self._queue = ti.queue -self._key = ti.key - -@property -def dag_id(self): -return self._dag_id - -@property -def task_id(self): -return self._task_id - -@property -def execution_date(self): -return self._execution_date - -@property -def start_date(self): -return self._start_date - -@property -def end_date(self): -return self._end_date - -@property -def try_number(self): -return self._try_number - -@property -def state(self): -return self._state - -@property -def pool(self): -return self._pool - -@property -def priority_weight(self): -return self._priority_weight - -@property -def queue(self): -
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010] URL: https://github.com/apache/airflow/pull/6596#discussion_r350326731 ## File path: airflow/jobs/scheduler_job.py ## @@ -66,7 +67,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): :param dag_id_white_list: If specified, only look at these DAG ID's :type dag_id_white_list: list[unicode] :param zombies: zombie task instances to kill -:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance] +:type zombies: list[airflow.executors.executor_types.SimpleTaskInstance] Review comment: Types for doc comments don't respect/use imports - they create links based on the package name only (i.e. mypy respects imports, sphinx doesn't and needs full package names to create a link when rendering.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010] URL: https://github.com/apache/airflow/pull/6596#discussion_r347861744 ## File path: tests/dags/test_subdag.py ## @@ -24,7 +24,7 @@ from datetime import datetime, timedelta -from airflow.models import DAG +from airflow.models.dag import DAG Review comment: ## Result of the current situation > Can you tell which way is better Neither :) They both work. But yes, having some internal/enforce rules for the airflow code base is probably a good thing for consistency. > Seems like people randomly choose the import they should use without knowing the consequences The only consequence is "if we change the layout it might break in the future". There is zero effect right now to import cycles or not. ## DAG developer's perspective > Mabe the deprecation warning should only be thrown if you are importing airflow from within airflow core code itself Yes if this was achievable I would be happy with this approach. Yes, the deprecation warning wouldn't _require_ a rewrite, but it would be annoying/noisy until the change was done - i.e. not something I want to force on users without a definite benefit to it. > I hope you can agree with me that it makes sense for Airflow "core" in airflow repository use a single, direct import (airflow.models.dag.DAG) where circular imports will be least likely Grudgingly, because people will look at the internals and copy that, and I really feel like this is leaking abstractions and `airflow.models.DAG` or `airflow.DAG` is what consumers of the library should be using. Part of this might be fixed by updating the docs to not build/publish docs for any `airflow.model.*` package (and bonus points for re-writing any references to just `airflow.models.Class`?) ## Avoiding cyclic imports The avoiding cycling imports doesn't worry me, as the code either works, or it crashes the tests and we fix it on a case-by-base basis. I haven't ever seen a case where what an end user imports causes or avoids a cyclic import -- it's all only within airflow PRs. Is there a case I've not seen where we've got a broken import based on which order packages are imported in tests/user code? ## Importing 'airflow' package first `python -c import airflow.hooks` will _always_ import `airflow/__init__.py` first and then load `airflow/hooks/__init__.py`. That is how python imports work. I'm all in favour of removing the side-effect-from-importing (I reported https://issues.apache.org/jira/browse/AIRFLOW-1931 a long time ago) but the cyclic import issue is just not a concern to me -- to my knowledge import cycles can only be created when _changing_ the airflow code (i.e. writing a PR) if you aren't changing the code then it is impossible to get a cycle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010] URL: https://github.com/apache/airflow/pull/6596#discussion_r347861744 ## File path: tests/dags/test_subdag.py ## @@ -24,7 +24,7 @@ from datetime import datetime, timedelta -from airflow.models import DAG +from airflow.models.dag import DAG Review comment: ## Result of the current situation > Can you tell which way is better Neither :) They both work. But yes, having some internal/enforce rules for the airflow code base is probably a good thing for consistency. > Seems like people randomly choose the import they should use without knowing the consequences The only consequence is "if we change the layout it might break in the future". There is zero effect right now to import cycles or not. ## DAG developer's perspective > Mabe the deprecation warning should only be thrown if you are importing airflow from within airflow core code itself Yes if this was achievable I would be happy with this approach. Yes, the deprecation warning wouldn't _require_ a rewrite, but it would be annoying/noisy until the change was done - i.e. not something I want to force on users without a definite benefit to it. > I hope you can agree with me that it makes sense for Airflow "core" in airflow repository use a single, direct import (airflow.models.dag.DAG) where circular imports will be least likely Grudgingly, because people will look at the internals and copy that, and I really feel like this is leaking abstractions and `airflow.models.DAG` or `airflow.DAG` is what consumers of the library should be using. Part of this might be fixed by updating the docs to not build/publish docs for any `airflow.model.*` package (and bonus points for re-writing any references to just `airflow.models.Class`?) ## Avoiding cyclic imports The avoiding cycling imports doesn't worry me, as the code either works, or it crashes the tests and we fix it on a case-by-base basis. I haven't ever seen a case where what an end user imports causes or avoids a cyclic import -- it's all only within airflow PRs. Is there a case I've not seen where we've got a broken import based on which order packages are imported in tests/user code? ## Importing 'airflow' package first `python -c import airflow.hooks` will _always_ import `airflow/__init__.py` first and then load `airflow/hooks/__init__.py`. That is how python imports work. I'm all in favour of removing the side-effect-from-importing (I reported https://issues.apache.org/jira/browse/AIRFLOW-1931 a long time ago) but the cyclic import issue is just not a concern to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010] URL: https://github.com/apache/airflow/pull/6596#discussion_r347861744 ## File path: tests/dags/test_subdag.py ## @@ -24,7 +24,7 @@ from datetime import datetime, timedelta -from airflow.models import DAG +from airflow.models.dag import DAG Review comment: ## Result of the current situation > Can you tell which way is better Neither :) They both work. But yes, having some internal/enforce rules for the airflow code base is probably a good thing for consistency. ## DAG developer's perspective > Mabe the deprecation warning should only be thrown if you are importing airflow from within airflow core code itself Yes if this was achievable I would be happy with this approach. Yes, the deprecation warning wouldn't _require_ a rewrite, but it would be annoying/noisy until the change was done - i.e. not something I want to force on users without a definite benefit to it. > I hope you can agree with me that it makes sense for Airflow "core" in airflow repository use a single, direct import (airflow.models.dag.DAG) where circular imports will be least likely Grudgingly, because people will look at the internals and copy that, and I really feel like this is leaking abstractions and `airflow.models.DAG` or `airflow.DAG` is what consumers of the library should be using. Part of this might be fixed by updating the docs to not build/publish docs for any `airflow.model.*` package (and bonus points for re-writing any references to just `airflow.models.Class`?) ## Avoiding cyclic imports The avoiding cycling imports doesn't worry me, as the code either works, or it crashes the tests and we fix it on a case-by-base basis. I haven't ever seen a case where what an end user imports causes or avoids a cyclic import -- it's all only within airflow PRs. Is there a case I've not seen where we've got a broken import based on which order packages are imported in tests/user code? ## Importing 'airflow' package first `python -c import airflow.hooks` will _always_ import `airflow/__init__.py` first and then load `airflow/hooks/__init__.py`. That is how python imports work. I'm all in favour of removing the side-effect-from-importing (I reported https://issues.apache.org/jira/browse/AIRFLOW-1931 a long time ago) but the cyclic import issue is just not a concern to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010] URL: https://github.com/apache/airflow/pull/6596#discussion_r347861744 ## File path: tests/dags/test_subdag.py ## @@ -24,7 +24,7 @@ from datetime import datetime, timedelta -from airflow.models import DAG +from airflow.models.dag import DAG Review comment: ## Result of the current situation > Can you tell which way is better Neither :) They both work. But yes, having some internal/enforce rules for the airflow code base is probably a good thing for consistency. ## DAG developer's perspective > Mabe the deprecation warning should only be thrown if you are importing airflow from within airflow core code itself Yes if this was achievable I would be happy with this approach. Yes, the deprecation warning wouldn't _require_ a rewrite, but it would be annoying/noisy until the change was done - i.e. not something I want to force on users without a definite benefit to it. > I hope you can agree with me that it makes sense for Airflow "core" in airflow repository use a single, direct import (airflow.models.dag.DAG) where circular imports will be least likely Grudgingly, because people will look at the internals and copy that, and I really feel like this is leaking abstractions and `airflow.models.DAG` or `airflow.DAG` is what consumers of the library should be using. ## Avoiding cyclic imports The avoiding cycling imports doesn't worry me, as the code either works, or it crashes the tests and we fix it on a case-by-base basis. I haven't ever seen a case where what an end user imports causes or avoids a cyclic import -- it's all only within airflow PRs. Is there a case I've not seen where we've got a broken import based on which order packages are imported in tests/user code? ## Importing 'airflow' package first `python -c import airflow.hooks` will _always_ import `airflow/__init__.py` first and then load `airflow/hooks/__init__.py`. That is how python imports work. I'm all in favour of removing the side-effect-from-importing (I reported https://issues.apache.org/jira/browse/AIRFLOW-1931 a long time ago) but the cyclic import issue is just not a concern to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r347486502 ## File path: tests/dags/test_subdag.py ## @@ -24,7 +24,7 @@ from datetime import datetime, timedelta -from airflow.models import DAG +from airflow.models.dag import DAG Review comment: > sometimes you do 'from airflow import DAG' sometimes 'from airflow.models import DAG' and sometimes 'from airflow.models.dag import DAG From a end user point of view I don't want them to have to know which models are in which sub-package as it is a implementation detail that they shouldn't need to care about. (I mostly don't like the one-class-per-package style so want to avoid that where possible. Pet peave of mine I know) > I am going to deprecate 'from airflow import DAG' I'm not a fan of this as it would require almost every single DAG that exists right now to be changed. We can manage the cyclic imports by explicitly moving some imports to the end of `airflow/__init__.py` > An average (or even experienced) developer [user] of Airflow has no idea about those consequences of different ways of importing such classes from different packages. Once `import airflow` has happened then all the packages are loaded an import order doesn't matter, so we can handle this all "internally" (which we already do via our unit tests). If it's working right now, then no mater what order it is imported by the user no import cycles are possible is my understanding. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r347422690 ## File path: airflow/executors/dask_executor.py ## @@ -44,6 +42,7 @@ def __init__(self, cluster_address=None): super().__init__(parallelism=0) def start(self): +import distributed Review comment: Yeah delaying the import of `*Executor` until needed def makes sense to stay. But by delaying the use of this import possibly delays errors due to missing packages for the current/selected executor untli task execution, instead of "at start up" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r347282260 ## File path: airflow/executors/__init__.py ## @@ -57,31 +56,25 @@ def get_default_executor(): return DEFAULT_EXECUTOR -class Executors: Review comment: I'm not immediately seeing why we need to pull this out of this file -- I think it was okay as it was? (I'm not a huge fan of one-file-per class, espeicaly for small classes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r347286614 ## File path: airflow/dag_context_manager/__init__.py ## @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""DAG context manager used for convenient task creation.""" +# Used by DAG context_managers +from typing import Optional + +from airflow.models.dag import DAG + +CONTEXT_MANAGER_DAG = None # type: Optional[DAG] Review comment: Or even a class property on `class Dag` itself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r347285095 ## File path: tests/dags/test_subdag.py ## @@ -24,7 +24,7 @@ from datetime import datetime, timedelta -from airflow.models import DAG +from airflow.models.dag import DAG Review comment: Shouldn't be needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r347283883 ## File path: airflow/dag_context_manager/__init__.py ## @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""DAG context manager used for convenient task creation.""" +# Used by DAG context_managers +from typing import Optional + +from airflow.models.dag import DAG + +CONTEXT_MANAGER_DAG = None # type: Optional[DAG] Review comment: Rather than it's own file, might this make more sense in `airflow/models/dag.py` - it's very closely tied to `class DAG` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r347286274 ## File path: airflow/executors/dask_executor.py ## @@ -44,6 +42,7 @@ def __init__(self, cluster_address=None): super().__init__(parallelism=0) def start(self): +import distributed Review comment: The changes to the airflow/executors/* shouldn't be needed now, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services