[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-03 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r353160974
 
 

 ##
 File path: UPDATING.md
 ##
 @@ -41,6 +41,30 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Changes to airflow package imports
+
+Importing `executor`, `hooks`, `macros`, `operators`, `sensors` packages no
+longer works indirectly via airflow package. For example this will not work:
+
+```python
+import airflow
+
+operator = airflow.operators.BashOperator(...)
 
 Review comment:
   `airflow.operators.BashOperator` was relying on the "load all operators from 
submodule and stick it on operators module" behavoiur, and was already warned 
with the `Removed deprecated import mechanism` section in the UPDATING, which 
shows the same import paths you've got here.
   
   So we might not need this here (though that could be updated to say it 
affects the other packages too)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-03 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r353159009
 
 

 ##
 File path: tests/operators/test_google_api_to_s3_transfer.py
 ##
 @@ -29,7 +30,7 @@
 class TestGoogleApiToS3Transfer(unittest.TestCase):
 
 def setUp(self):
-configuration.load_test_config()
+load_test_config()
 
 Review comment:
   Remove here too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-03 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r353158679
 
 

 ##
 File path: tests/gcp/hooks/test_google_discovery_api.py
 ##
 @@ -20,15 +20,16 @@
 import unittest
 from unittest.mock import call, patch
 
-from airflow import configuration, models
+from airflow import models
+from airflow.configuration import load_test_config
 from airflow.gcp.hooks.discovery_api import GoogleDiscoveryApiHook
 from airflow.utils import db
 
 
 class TestGoogleDiscoveryApiHook(unittest.TestCase):
 
 def setUp(self):
-configuration.load_test_config()
+load_test_config()
 
 Review comment:
   I thought I got rid of all of these. This isn't needed any longer and cna 
just be removed entirely.
   ```suggestion
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-03 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r353156998
 
 

 ##
 File path: airflow/utils/dag_processing.py
 ##
 @@ -724,26 +530,27 @@ class DagFileProcessorManager(LoggingMixin):
 """
 
 def __init__(self,
- dag_directory,
- file_paths,
- max_runs,
- processor_factory,
- processor_timeout,
- signal_conn,
- async_mode=True):
-self._file_paths = file_paths
-self._file_path_queue = []
-self._dag_directory = dag_directory
-self._max_runs = max_runs
-self._processor_factory = processor_factory
-self._signal_conn = signal_conn
-self._async_mode = async_mode
+ dag_directory: str,
+ file_paths: List[str],
+ max_runs: int,
+ processor_factory: Callable[[str, List[Any]], 
AbstractDagFileProcessor],
+ processor_timeout: timedelta,
+ signal_conn: Connection,
+ async_mode: bool = True):
+self._file_paths: List[str] = file_paths
+self._file_path_queue: List[str] = []
+self._dag_directory: str = dag_directory
+self._max_runs: int = max_runs
+self._processor_factory: Callable[[str, List[Any]], 
AbstractDagFileProcessor] = processor_factory
+self._signal_conn: Connection = signal_conn
+self._async_mode: bool = async_mode
 
 Review comment:
   Style point that w should decide upon: In cases like this where th function 
arg is typed we don't also need to specify the type on the instance attribute - 
mypy will do that automatically.
   
   So this could be written as:
   
   ```
   def __init__(self:
dag_directory: str,
file_paths: List[str],
max_runs: int,
processor_factory: Callable[[str, List[Any]], 
AbstractDagFileProcessor],
processor_timeout: timedelta,
signal_conn: Connection,
async_mode: bool = True):
   self._file_paths file_paths
   self._file_path_queue: List[str] = []
   self._dag_directory = dag_directory
   self._max_runs = max_runs
   self._processor_factory = processor_factory
   self._signal_conn = signal_conn
   self._async_mode = async_mode
   ```
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-03 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r353154607
 
 

 ##
 File path: airflow/executors/dask_executor.py
 ##
 @@ -16,36 +15,37 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+"""Dask executor."""
 import subprocess
-import warnings
+from typing import Any, Dict, Optional
 
-import distributed
+from distributed import Client, Future, as_completed
+from distributed.security import Security
 
 from airflow.configuration import conf
-from airflow.executors.base_executor import BaseExecutor
+from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, 
CommandType
+from airflow.models.taskinstance import TaskInstanceKeyType
 
 
 class DaskExecutor(BaseExecutor):
 """
 DaskExecutor submits tasks to a Dask Distributed cluster.
 """
 def __init__(self, cluster_address=None):
+super().__init__(parallelism=0)
 if cluster_address is None:
 cluster_address = conf.get('dask', 'cluster_address')
-if not cluster_address:
-raise ValueError(
-'Please provide a Dask cluster address in airflow.cfg')
+assert cluster_address, 'Please provide a Dask cluster address in 
airflow.cfg'
 self.cluster_address = cluster_address
 # ssl / tls parameters
 self.tls_ca = conf.get('dask', 'tls_ca')
 self.tls_key = conf.get('dask', 'tls_key')
 self.tls_cert = conf.get('dask', 'tls_cert')
-super().__init__(parallelism=0)
+self.client: Optional[Client] = None
+self.futures: Optional[Dict[Optional[Future], TaskInstanceKeyType]] = 
None
 
 Review comment:
   ```suggestion
   self.futures: Optional[Dict[Future, TaskInstanceKeyType]] = None
   ```
   
   Having Optional key type seems odd?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-03 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r353154297
 
 

 ##
 File path: airflow/executors/celery_executor.py
 ##
 @@ -264,30 +284,44 @@ def sync(self):
 )
 continue
 key, state = key_and_state
-try:
-if self.last_state[key] != state:
-if state == celery_states.SUCCESS:
-self.success(key)
-del self.tasks[key]
-del self.last_state[key]
-elif state == celery_states.FAILURE:
-self.fail(key)
-del self.tasks[key]
-del self.last_state[key]
-elif state == celery_states.REVOKED:
-self.fail(key)
-del self.tasks[key]
-del self.last_state[key]
-else:
-self.log.info("Unexpected state: %s", state)
-self.last_state[key] = state
-except Exception:
-self.log.exception("Error syncing the Celery executor, 
ignoring it.")
-
-def end(self, synchronous=False):
+self.update_task_state(key, state)
+
+def update_task_state(self, key: TaskInstanceKeyType, state: str) -> None:
+"""Updates state of a single task."""
+# noinspection PyBroadException
+try:
+if self.last_state[key] != state:
+if state == celery_states.SUCCESS:
+self.success(key)
+del self.tasks[key]
+del self.last_state[key]
+elif state == celery_states.FAILURE:
+self.fail(key)
+del self.tasks[key]
+del self.last_state[key]
+elif state == celery_states.REVOKED:
+self.fail(key)
+del self.tasks[key]
+del self.last_state[key]
+else:
+self.log.info("Unexpected state: %s", state)
+self.last_state[key] = state
+except Exception:  # pylint: disable=broad-except
+self.log.exception("Error syncing the Celery executor, ignoring 
it.")
+
+def end(self, synchronous: bool = False) -> None:
 if synchronous:
-while any([
-task.state not in celery_states.READY_STATES
-for task in self.tasks.values()]):
+while any([task.state not in celery_states.READY_STATES for task 
in self.tasks.values()]):
 time.sleep(5)
 self.sync()
+
+def execute_async(self,
+  key: TaskInstanceKeyType,
+  command: CommandType,
+  queue: Optional[str] = None,
+  executor_config: Optional[Any] = None):
+"""Do not allow async execution for Celery executor."""
+raise AirflowException("No Async execution for Celery executor.")
+
+def terminate(self):
+"""Terminate the executor is not doing anything."""
 
 Review comment:
   ```suggestion
   pass
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-03 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r353152675
 
 

 ##
 File path: UPDATING.md
 ##
 @@ -41,6 +41,30 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Changes to airflow package imports
+
+Importing `executor`, `hooks`, `macros`, `operators`, `sensors` packages no
+longer works indirectly via airflow package. For example this will not work:
+
+```python
+import airflow
+
+operator = airflow.operators.BashOperator(...)
 
 Review comment:
   Hasn't this alredy been deprecated/ I guess this did actuall owkr but issued 
a warning before this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-03 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r353151215
 
 

 ##
 File path: airflow/executors/dask_executor.py
 ##
 @@ -66,20 +63,17 @@ def execute_async(self,
   command: CommandType,
   queue: Optional[str] = None,
   executor_config: Optional[Any] = None) -> None:
-if not self.futures:
-raise AirflowException("Executor should be started first.")
+assert self.futures, NOT_STARTED_MESSAGE
 
 Review comment:
   I think this is a good case for asserts.
   
   Under normal circumstances they should pass, and it's only if you're 
developing really that you might get this wrong, so having this be asserts that 
could be disabled by `python -O` seems like the right thing to do.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352568182
 
 

 ##
 File path: airflow/settings.py
 ##
 @@ -68,13 +67,13 @@
 LOG_FORMAT = conf.get('core', 'log_format')
 SIMPLE_LOG_FORMAT = conf.get('core', 'simple_log_format')
 
-SQL_ALCHEMY_CONN = None  # type: Optional[str]
-DAGS_FOLDER = None  # type: Optional[str]
-PLUGINS_FOLDER = None  # type: Optional[str]
-LOGGING_CLASS_PATH = None  # type: Optional[str]
+SQL_ALCHEMY_CONN: Optional[str] = None
 
 Review comment:
   Unrelated changes in this file now?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352567794
 
 

 ##
 File path: airflow/plugins_manager.py
 ##
 @@ -16,17 +15,18 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-import imp
+"""Manages all plugins."""
+# noinspection PyDeprecation
+import imp  # pylint: disable=deprecated-module
 import inspect
 import os
 import re
-from typing import Any, List
+import sys
+from typing import Any, Callable, List, Optional
 
 import pkg_resources
 
 from airflow import settings
-from airflow.models.baseoperator import BaseOperatorLink
 
 Review comment:
   We're removing this one and the tighter type integartion because otherwise 
this pulls in all of airflow.models which we want to avoid?
   
   Is this worth using an `if TYPING:` for?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352567425
 
 

 ##
 File path: airflow/plugins_manager.py
 ##
 @@ -187,26 +192,29 @@ def make_module(name, objects):
 macros_modules = []
 
 # Plugin components to integrate directly
-admin_views = []  # type: List[Any]
-flask_blueprints = []  # type: List[Any]
-menu_links = []  # type: List[Any]
-flask_appbuilder_views = []  # type: List[Any]
-flask_appbuilder_menu_links = []  # type: List[Any]
-stat_name_handler = None  # type: Any
-global_operator_extra_links = []  # type: List[BaseOperatorLink]
-operator_extra_links = []  # type: List[BaseOperatorLink]
+admin_views: List[Any] = []
+flask_blueprints: List[Any] = []
+menu_links: List[Any] = []
+flask_appbuilder_views: List[Any] = []
+flask_appbuilder_menu_links: List[Any] = []
+stat_name_handler: Any = None
 
 Review comment:
   ```suggestion
   stat_name_handler: Optional[Callable[[str], str]] = None
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352566346
 
 

 ##
 File path: airflow/models/dagbag.py
 ##
 @@ -406,8 +404,10 @@ def collect_dags(
 FileLoadStat = namedtuple(
 'FileLoadStat', "file duration dag_num task_num dags")
 
+from airflow.utils.file import correct_maybe_zipped
 dag_folder = correct_maybe_zipped(dag_folder)
 
+from airflow.utils.file import list_py_file_paths
 
 Review comment:
   Small nit: we could combine the two imports in to a single line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352565842
 
 

 ##
 File path: airflow/models/dag.py
 ##
 @@ -1254,9 +1253,11 @@ def run(
 """
 from airflow.jobs import BackfillJob
 if not executor and local:
+from airflow.executors.local_executor import LocalExecutor
 executor = LocalExecutor()
 elif not executor:
-executor = get_default_executor()
+from airflow.executors.executor_loader import ExecutorLoader
 
 Review comment:
   (Not for this PR, but I do wonder if we should remove this functionality as 
it feels somewhat out of place with the "normal" way that `airflow tasks run` 
is used. It's almost closer to `airflow tasks submit` in nature. But it also 
bypasses lots of the scheduler "gating" that maybe it shouldn't.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352565100
 
 

 ##
 File path: airflow/logging_config.py
 ##
 @@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 
 Review comment:
   Could you undo this change as it's the only one in the file please?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352563810
 
 

 ##
 File path: airflow/executors/local_executor.py
 ##
 @@ -43,40 +42,44 @@
 This option could lead to the unification of the executor implementations, 
running
 locally, into just one `LocalExecutor` with multiple modes.
 """
-
-import multiprocessing
 import subprocess
-from queue import Empty
-
-from airflow.executors.base_executor import BaseExecutor
+from multiprocessing import Manager, Process
+from multiprocessing.managers import SyncManager
+from queue import Empty, Queue  # pylint: disable=unused-import  # noqa: F401
+from typing import Any, List, Optional, Tuple, Union  # pylint: 
disable=unused-import # noqa: F401
+
+from airflow import AirflowException
+from airflow.executors.base_executor import NOT_STARTED_MESSAGE, PARALLELISM, 
BaseExecutor, CommandType
+from airflow.models.taskinstance import (  # pylint: disable=unused-import # 
noqa: F401
+TaskInstanceKeyType, TaskInstanceStateType,
+)
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
+# This is a work to be executed by a worker.
+# It can Key and Command - but it can also be None, None which is actually a
+# "Poison Pill" - worker seeing Poison Pill should take the pill and ... die 
instantly.
+ExecutorWorkTodo = Tuple[Optional[TaskInstanceKeyType], Optional[CommandType]]
 
 Review comment:
   ```suggestion
   ExecutorWorkType = Tuple[Optional[TaskInstanceKeyType], 
Optional[CommandType]]
   ```
   
   I think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352563506
 
 

 ##
 File path: airflow/executors/local_executor.py
 ##
 @@ -43,40 +42,44 @@
 This option could lead to the unification of the executor implementations, 
running
 locally, into just one `LocalExecutor` with multiple modes.
 """
-
-import multiprocessing
 import subprocess
-from queue import Empty
-
-from airflow.executors.base_executor import BaseExecutor
+from multiprocessing import Manager, Process
+from multiprocessing.managers import SyncManager
+from queue import Empty, Queue  # pylint: disable=unused-import  # noqa: F401
+from typing import Any, List, Optional, Tuple, Union  # pylint: 
disable=unused-import # noqa: F401
+
+from airflow import AirflowException
 
 Review comment:
   Did we not say that inside the core this should be imported from 
`airflow.exceptions` directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352563040
 
 

 ##
 File path: airflow/executors/kubernetes_executor.py
 ##
 @@ -601,18 +629,19 @@ def terminate(self):
 
 class KubernetesExecutor(BaseExecutor, LoggingMixin):
 """Executor for Kubernetes"""
+
 def __init__(self):
-self.kube_config = KubeConfig()
-self.task_queue = None
-self.result_queue = None
-self.kube_scheduler = None
-self.kube_client = None
-self.worker_uuid = None
+self.kube_config: KubeConfig = KubeConfig()
 
 Review comment:
   ```suggestion
   self.kube_config = KubeConfig()
   ```
   
   don't need this one (but do for the rest of the ones in this fn)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352562023
 
 

 ##
 File path: airflow/executors/kubernetes_executor.py
 ##
 @@ -241,17 +253,23 @@ def _validate(self):
 
 class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
 """Watches for Kubernetes jobs"""
-def __init__(self, namespace, watcher_queue, resource_version, 
worker_uuid, kube_config):
+def __init__(self,
+ namespace: str,
+ watcher_queue: 'Queue[KubernetesWatchType]',
+ resource_version: str,
+ worker_uuid: Optional[str],
+ kube_config: Configuration):
 multiprocessing.Process.__init__(self)
-self.namespace = namespace
-self.worker_uuid = worker_uuid
-self.watcher_queue = watcher_queue
-self.resource_version = resource_version
-self.kube_config = kube_config
+self.namespace: str = namespace
 
 Review comment:
   
https://mypy.readthedocs.io/en/stable/class_basics.html#instance-and-class-attributes
 says we don't need to do this


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352561195
 
 

 ##
 File path: airflow/executors/dask_executor.py
 ##
 @@ -55,23 +58,28 @@ def start(self):
 else:
 security = None
 
-self.client = distributed.Client(self.cluster_address, 
security=security)
+self.client = Client(self.cluster_address, security=security)
 self.futures = {}
 
-def execute_async(self, key, command, queue=None, executor_config=None):
-if queue is not None:
-warnings.warn(
-'DaskExecutor does not support queues. '
-'All tasks will be run in the same cluster'
-)
+def execute_async(self,
+  key: TaskInstanceKeyType,
+  command: CommandType,
+  queue: Optional[str] = None,
+  executor_config: Optional[Any] = None) -> None:
+if not self.futures:
+raise AirflowException("Executor should be started first.")
 
 def airflow_run():
 return subprocess.check_call(command, close_fds=True)
 
+if not self.client:
+raise AirflowException("The Dask executor has not been started 
yet!")
 future = self.client.submit(airflow_run, pure=False)
 self.futures[future] = key
 
-def _process_future(self, future):
+def _process_future(self, future: Future) -> None:
+if not self.futures:
 
 Review comment:
   There's mixed styles in this file -- some us `if not self.x` some do `assert 
self.x` -- we should be consistent


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352499486
 
 

 ##
 File path: airflow/executors/celery_executor.py
 ##
 @@ -264,30 +284,45 @@ def sync(self):
 )
 continue
 key, state = key_and_state
-try:
-if self.last_state[key] != state:
-if state == celery_states.SUCCESS:
-self.success(key)
-del self.tasks[key]
-del self.last_state[key]
-elif state == celery_states.FAILURE:
-self.fail(key)
-del self.tasks[key]
-del self.last_state[key]
-elif state == celery_states.REVOKED:
-self.fail(key)
-del self.tasks[key]
-del self.last_state[key]
-else:
-self.log.info("Unexpected state: %s", state)
-self.last_state[key] = state
-except Exception:
-self.log.exception("Error syncing the Celery executor, 
ignoring it.")
-
-def end(self, synchronous=False):
+self.update_task_state(key, state)
+
+# noinspection PyUnreachableCode
 
 Review comment:
   What does it think is unreachable in here? 🤔 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352498858
 
 

 ##
 File path: airflow/executors/celery_executor.py
 ##
 @@ -57,17 +61,17 @@
 
 
 @app.task
-def execute_command(command_to_exec):
+def task(command_to_exec: str):
+"""Executes command."""
 log = LoggingMixin().log
 log.info("Executing command in Celery: %s", command_to_exec)
 env = os.environ.copy()
 try:
 subprocess.check_call(command_to_exec, stderr=subprocess.STDOUT,
   close_fds=True, env=env)
 except subprocess.CalledProcessError as e:
-log.exception('execute_command encountered a CalledProcessError')
+log.exception('execute command encountered a CalledProcessError')
 
 Review comment:
   ^^


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352498412
 
 

 ##
 File path: airflow/executors/base_executor.py
 ##
 @@ -16,67 +14,86 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+"""
+Base executor - this is the base class for all the implemented executors.
+"""
 from collections import OrderedDict
+from typing import Any, Dict, List, Optional, Set, Tuple, Union
 
-# To avoid circular imports
-import airflow.utils.dag_processing
-from airflow.configuration import conf
 
 Review comment:
   I would have thought that we want this import, not `from airflow import 
conf`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352497006
 
 

 ##
 File path: airflow/__init__.py
 ##
 @@ -44,23 +44,8 @@
 
 settings.initialize()
 
-login = None  # type: Optional[Callable]
+from airflow.plugins_manager import integrate_plugins
 
 Review comment:
   Oh, cos importing plugins mananger needs settings to have been loaded. Right.
   
   _somewhat_ off scope, so this can probably stay as it is for now, but we 
could move all of the top level code in airflow.plugins_manager in to this new 
`integrate_plugins` method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352495420
 
 

 ##
 File path: airflow/__init__.py
 ##
 @@ -44,23 +44,8 @@
 
 settings.initialize()
 
-login = None  # type: Optional[Callable]
+from airflow.plugins_manager import integrate_plugins
 
 Review comment:
   Any reason not to have this import at the top with the normal block?
   
   Because of L31 we've diabled "wrong-import-position" for the entire fil


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352494761
 
 

 ##
 File path: airflow/__init__.py
 ##
 @@ -44,23 +44,8 @@
 
 settings.initialize()
 
-login = None  # type: Optional[Callable]
+from airflow.plugins_manager import integrate_plugins
 
-from airflow import executors
-from airflow import hooks
-from airflow import macros
-from airflow import operators
-from airflow import sensors
+login: Optional[Callable] = None
 
-
-class AirflowMacroPlugin:
-# pylint: disable=missing-docstring
-def __init__(self, namespace):
-self.namespace = namespace
-
-
-operators._integrate_plugins()  # pylint: disable=protected-access
-sensors._integrate_plugins()  # pylint: disable=protected-access
-hooks._integrate_plugins()  # pylint: disable=protected-access
-executors._integrate_plugins()  # pylint: disable=protected-access
-macros._integrate_plugins()  # pylint: disable=protected-access
+integrate_plugins()
 
 Review comment:
   I guess the main difference here is this no longer works:
   
   ```
   import airflow;
   airflow.operators.BashOperator
   ```
   
   However given that is _already_ deprecated, and `import airflow; 
airflow.operators.bash_operator.BashOperator` doesn't work on master I _think_ 
this is a safe change.
   
   Though probably needs an explicit mention in UPDATING.md (if it's not there 
already)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352492591
 
 

 ##
 File path: airflow/__init__.py
 ##
 @@ -44,23 +44,8 @@
 
 settings.initialize()
 
-login = None  # type: Optional[Callable]
+from airflow.plugins_manager import integrate_plugins
 
-from airflow import executors
-from airflow import hooks
-from airflow import macros
-from airflow import operators
-from airflow import sensors
+login: Optional[Callable] = None
 
-
-class AirflowMacroPlugin:
 
 Review comment:
   I think this is probably fine -- this class hasn't done anything since 
mid-2015, so I can't see why someone might be using it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-12-02 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352491482
 
 

 ##
 File path: airflow/__init__.py
 ##
 @@ -44,23 +44,8 @@
 
 settings.initialize()
 
-login = None  # type: Optional[Callable]
+from airflow.plugins_manager import integrate_plugins
 
-from airflow import executors
-from airflow import hooks
-from airflow import macros
-from airflow import operators
-from airflow import sensors
+login: Optional[Callable] = None
 
-
-class AirflowMacroPlugin:
 
 Review comment:
   Hmmm might be worth mentioning? It's not something we mention in our docs 
anywhere but...?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299444
 
 

 ##
 File path: airflow/executors/kubernetes_executor.py
 ##
 @@ -703,10 +735,12 @@ def _create_or_update_secret(secret_name, secret_path):
 for service_account in name_path_pair_list:
 _create_or_update_secret(service_account['name'], 
service_account['path'])
 
-def start(self):
+def start(self) -> None:
 """Starts the executor"""
 self.log.info('Start Kubernetes executor')
 self.worker_uuid = 
KubeWorkerIdentifier.get_or_create_current_kube_worker_uuid()
+if not self.worker_uuid:
+raise AirflowException("Could not get worker_uuid")
 
 Review comment:
   Shouldn't this exception be inside `get_or_create_current_kube_worker_uuid` 
instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299553
 
 

 ##
 File path: airflow/executors/local_executor.py
 ##
 @@ -43,40 +42,44 @@
 This option could lead to the unification of the executor implementations, 
running
 locally, into just one `LocalExecutor` with multiple modes.
 """
-
-import multiprocessing
 import subprocess
-from queue import Empty
-
-from airflow.executors.base_executor import BaseExecutor
+from multiprocessing import Manager, Process
+from multiprocessing.managers import SyncManager
+from queue import Empty, Queue  # pylint: disable=unused-import  # noqa: F401
+from typing import Any, List, Optional, Tuple, Union  # pylint: 
disable=unused-import # noqa: F401
+
+from airflow import AirflowException
+from airflow.executors.base_executor import NOT_STARTED_MESSAGE, PARALLELISM, 
BaseExecutor, CommandType
+from airflow.models.taskinstance import (  # pylint: disable=unused-import # 
noqa: F401
+TaskInstanceKey, TaskInstanceState,
+)
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
+# This is a work to be executed by a worker.
+# It can Key and Command - but it can also be None, None which is actually a
+# "Poison Pill" - worker seeing Poison Pill should take the pill and ... die 
instantly.
+ExecutorWorkTodo = Tuple[Optional[TaskInstanceKey], Optional[CommandType]]
 
-class LocalWorker(multiprocessing.Process, LoggingMixin):
 
-"""LocalWorker Process implementation to run airflow commands. Executes 
the given
-command and puts the result into a result queue when done, terminating 
execution."""
+class LocalWorkerBase(Process, LoggingMixin):
+"""
+LocalWorker Process implementation to run airflow commands. Executes the 
given
 
 Review comment:
   Doesn't match class name


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299790
 
 

 ##
 File path: airflow/kubernetes/pod_generator.py
 ##
 @@ -26,35 +26,7 @@
 
 import kubernetes.client.models as k8s
 
-from airflow.executors import Executors
-
-
-class PodDefaults:
-"""
-Static defaults for the PodGenerator
-"""
-XCOM_MOUNT_PATH = '/airflow/xcom'
-SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
-XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 30; done;'
-VOLUME_MOUNT = k8s.V1VolumeMount(
-name='xcom',
-mount_path=XCOM_MOUNT_PATH
-)
-VOLUME = k8s.V1Volume(
-name='xcom',
-empty_dir=k8s.V1EmptyDirVolumeSource()
-)
-SIDECAR_CONTAINER = k8s.V1Container(
-name=SIDECAR_CONTAINER_NAME,
-command=['sh', '-c', XCOM_CMD],
-image='alpine',
-volume_mounts=[VOLUME_MOUNT],
-resources=k8s.V1ResourceRequirements(
-requests={
-"cpu": "1m",
-}
-),
-)
+from airflow.kubernetes.pod_defaults import PodDefaults
 
 Review comment:
   Why did we have to move this one out? Given that this module has almost no 
imports it seems like we shouldn't have to touch it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299109
 
 

 ##
 File path: airflow/executors/base_executor.py
 ##
 @@ -16,67 +14,82 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+"""Base executor."""
 from collections import OrderedDict
+from queue import Queue
+from typing import Any, Dict, Optional, Set, Tuple
 
-# To avoid circular imports
-import airflow.utils.dag_processing
-from airflow.configuration import conf
+from airflow import AirflowException, LoggingMixin, conf
+from airflow.executors.executor_type_aliases import CommandType, 
ExecutorKeyType
+from airflow.models import TaskInstance
 from airflow.stats import Stats
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.dag_processing import SimpleTaskInstance
 from airflow.utils.state import State
 
-PARALLELISM = conf.getint('core', 'PARALLELISM')
+PARALLELISM: int = conf.getint('core', 'PARALLELISM')
 
 
 class BaseExecutor(LoggingMixin):
-
-def __init__(self, parallelism=PARALLELISM):
-"""
-Class to derive in order to interface with executor-type systems
-like Celery, Yarn and the likes.
-
-:param parallelism: how many jobs should run at one time. Set to
-``0`` for infinity
-:type parallelism: int
-"""
-self.parallelism = parallelism
-self.queued_tasks = OrderedDict()
-self.running = {}
-self.event_buffer = {}
+"""
+Class to derive in order to interface with executor-type systems
+like Celery, Yarn and the likes.
+
+:param parallelism: how many jobs should run at one time. Set to
+``0`` for infinity
+:type parallelism: int
+"""
+
+def __init__(self, parallelism: int = PARALLELISM):
+super().__init__()
+self.parallelism: int = parallelism
+self.queued_tasks: OrderedDict[
+ExecutorKeyType,
+Tuple[CommandType, int, Queue, SimpleTaskInstance]] \
+= OrderedDict()
+self.running: Set[ExecutorKeyType] = set()
+self.event_buffer: Dict[ExecutorKeyType, str] = {}
 
 def start(self):  # pragma: no cover
 """
-Executors may need to get things started. For example LocalExecutor
-starts N workers.
+Executors may need to get things started.
 """
 
-def queue_command(self, simple_task_instance, command, priority=1, 
queue=None):
+def queue_command(self,
+  simple_task_instance: SimpleTaskInstance,
+  command_to_run: CommandType,
+  priority: int = 1,
+  queue: Optional[Queue] = None):
+"""Queues command to task"""
 key = simple_task_instance.key
-if key not in self.queued_tasks and key not in self.running:
-self.log.info("Adding to queue: %s", command)
-self.queued_tasks[key] = (command, priority, queue, 
simple_task_instance)
+if key not in self.queued_tasks.keys() and key not in self.running:
 
 Review comment:
   Is this going to make it slower -- i.e. does this genrate a list of keys and 
then do a list search for the key vs just a direct hash lookup?
   
   What was the error? This feels like a bug in mypy rather than a bug in our 
code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299235
 
 

 ##
 File path: airflow/executors/base_executor.py
 ##
 @@ -16,67 +14,82 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+"""Base executor."""
 from collections import OrderedDict
+from queue import Queue
+from typing import Any, Dict, Optional, Set, Tuple
 
-# To avoid circular imports
-import airflow.utils.dag_processing
-from airflow.configuration import conf
+from airflow import AirflowException, LoggingMixin, conf
+from airflow.executors.executor_type_aliases import CommandType, 
ExecutorKeyType
+from airflow.models import TaskInstance
 from airflow.stats import Stats
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.dag_processing import SimpleTaskInstance
 from airflow.utils.state import State
 
-PARALLELISM = conf.getint('core', 'PARALLELISM')
+PARALLELISM: int = conf.getint('core', 'PARALLELISM')
 
 
 class BaseExecutor(LoggingMixin):
-
-def __init__(self, parallelism=PARALLELISM):
-"""
-Class to derive in order to interface with executor-type systems
-like Celery, Yarn and the likes.
-
-:param parallelism: how many jobs should run at one time. Set to
-``0`` for infinity
-:type parallelism: int
-"""
-self.parallelism = parallelism
-self.queued_tasks = OrderedDict()
-self.running = {}
-self.event_buffer = {}
+"""
+Class to derive in order to interface with executor-type systems
+like Celery, Yarn and the likes.
+
+:param parallelism: how many jobs should run at one time. Set to
+``0`` for infinity
+:type parallelism: int
+"""
+
+def __init__(self, parallelism: int = PARALLELISM):
+super().__init__()
+self.parallelism: int = parallelism
+self.queued_tasks: OrderedDict[
+ExecutorKeyType,
+Tuple[CommandType, int, Queue, SimpleTaskInstance]] \
+= OrderedDict()
+self.running: Set[ExecutorKeyType] = set()
+self.event_buffer: Dict[ExecutorKeyType, str] = {}
 
 def start(self):  # pragma: no cover
 """
-Executors may need to get things started. For example LocalExecutor
-starts N workers.
+Executors may need to get things started.
 """
 
-def queue_command(self, simple_task_instance, command, priority=1, 
queue=None):
+def queue_command(self,
+  simple_task_instance: SimpleTaskInstance,
+  command_to_run: CommandType,
+  priority: int = 1,
+  queue: Optional[Queue] = None):
+"""Queues command to task"""
 key = simple_task_instance.key
-if key not in self.queued_tasks and key not in self.running:
-self.log.info("Adding to queue: %s", command)
-self.queued_tasks[key] = (command, priority, queue, 
simple_task_instance)
+if key not in self.queued_tasks.keys() and key not in self.running:
 
 Review comment:
   Though why does it complain here but not in `has_task`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352300164
 
 

 ##
 File path: docs/plugins.rst
 ##
 @@ -167,7 +167,7 @@ definitions in Airflow.
 from airflow.models.baseoperator import BaseOperatorLink
 from airflow.operators.gcs_to_s3 import GoogleCloudStorageToS3Operator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
-from airflow.executors.base_executor import BaseExecutor
+from airflow.executors.base_executors import BaseExecutor
 
 Review comment:
   This counds as a breaking change - someone has probably created a custom 
Executor out there. Is it possible not not change this module name please?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299475
 
 

 ##
 File path: airflow/executors/kubernetes_executor.py
 ##
 @@ -715,6 +749,10 @@ def start(self):
 KubeResourceVersion.reset_resource_version()
 self.task_queue = self._manager.Queue()
 self.result_queue = self._manager.Queue()
+if not self.task_queue:
+raise AirflowException("Could not start task_queue")
 
 Review comment:
   Is this actually possible to hit this line or is it just to make mypy happy 
and any excpetions would actually come from `_manager.Queue()` directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299255
 
 

 ##
 File path: airflow/executors/base_executor.py
 ##
 @@ -132,47 +151,64 @@ def heartbeat(self):
 self.log.debug("Calling the %s sync method", self.__class__)
 self.sync()
 
-def trigger_tasks(self, open_slots):
+def trigger_tasks(self, open_slots: int) -> None:
 """
-Trigger tasks
+Triggers tasks
 
 :param open_slots: Number of open slots
-:return:
 """
 sorted_queue = sorted(
 [(k, v) for k, v in self.queued_tasks.items()],
 key=lambda x: x[1][1],
 reverse=True)
 for _ in range(min((open_slots, len(self.queued_tasks:
-key, (command, _, queue, simple_ti) = sorted_queue.pop(0)
+key, (command, _, _, simple_ti) = sorted_queue.pop(0)
 self.queued_tasks.pop(key)
-self.running[key] = command
+self.running.add(key)
 self.execute_async(key=key,
command=command,
-   queue=queue,
 
 Review comment:
   This feels like an API change to the executor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299639
 
 

 ##
 File path: airflow/executors/local_executor.py
 ##
 @@ -201,31 +273,50 @@ def sync(self):
 break
 
 def end(self):
-# Sending poison pill to all worker
+"""Ends the executor. Sends the poison pill to all workers."""
 for _ in self.executor.workers:
 self.queue.put((None, None))
 
 # Wait for commands to finish
 self.queue.join()
 self.executor.sync()
 
-def start(self):
-self.manager = multiprocessing.Manager()
+def start(self) -> None:
+"""Starts the executor"""
+self.manager = Manager()
 self.result_queue = self.manager.Queue()
 self.workers = []
 self.workers_used = 0
 self.workers_active = 0
-self.impl = (LocalExecutor._UnlimitedParallelism(self) if 
self.parallelism == 0
- else LocalExecutor._LimitedParallelism(self))
+self.impl = (LocalExecutor.UnlimitedParallelism(self) if 
self.parallelism == 0
+ else LocalExecutor.LimitedParallelism(self))
 
 self.impl.start()
 
-def execute_async(self, key, command, queue=None, executor_config=None):
-self.impl.execute_async(key=key, command=command)
+def execute_async(self, key: TaskInstanceKey,
+  command: CommandType,
+  executor_config: Optional[Any] = None) -> None:
+"""Execute asynchronously."""
+if not self.impl:
+raise AirflowException(NOT_STARTED_MESSAGE)
+self.impl.execute_async(key=key, command=command, 
executor_config=executor_config)
 
-def sync(self):
+def sync(self) -> None:
+"""
+Sync will get called periodically by the heartbeat method.
+"""
+if not self.impl:
+raise AirflowException(NOT_STARTED_MESSAGE)
 self.impl.sync()
 
-def end(self):
+def end(self) -> None:
+"""
+Ends the executor.
+:return:
+"""
+if not self.impl:
+raise AirflowException("Executor should be started first")
 self.impl.end()
+if not self.manager:
+raise AirflowException("Executor should be started first")
 
 Review comment:
   Should use NOT_STARTED_MESSAGE here (or the custom exception subclass)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352298971
 
 

 ##
 File path: airflow/executors/all_executors.py
 ##
 @@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""All executors."""
+from typing import Optional
+
+from airflow import AirflowException, LoggingMixin, conf
+from airflow.executors.base_executor import BaseExecutor
+
+
+class AllExecutors:
 
 Review comment:
   ```suggestion
   class Loader:
   ```
   
   (and rename to loader.py)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299315
 
 

 ##
 File path: airflow/executors/base_executor.py
 ##
 @@ -185,25 +221,22 @@ def get_event_buffer(self, dag_ids=None):
 return cleared_events
 
 def execute_async(self,
-  key,
-  command,
-  queue=None,
-  executor_config=None):  # pragma: no cover
+  key: TaskInstanceKey,
+  command: CommandType,
+  executor_config: Optional[Any] = None) -> None:  # 
pragma: no cover
 """
 This method will execute the command asynchronously.
+
+:param key: Unique key for the task instance
+:param command: Command to run
+:param executor_config: Configuration passed to the executor.
 """
 raise NotImplementedError()
 
-def end(self):  # pragma: no cover
+def end(self) -> None:  # pragma: no cover
 """
 This method is called when the caller is done submitting job and
 wants to wait synchronously for the job submitted previously to be
 all done.
 """
 raise NotImplementedError()
-
-def terminate(self):
 
 Review comment:
   Also feels like an API change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299527
 
 

 ##
 File path: airflow/executors/kubernetes_executor.py
 ##
 @@ -820,8 +877,14 @@ def _flush_result_queue(self):
 except Empty:
 break
 
-def end(self):
+def end(self) -> None:
 """Called when the executor shuts down"""
+if not self.task_queue:
+raise AirflowException(NOT_STARTED_MESSAGE)
+if not self.result_queue:
+raise AirflowException(NOT_STARTED_MESSAGE)
 
 Review comment:
   All these feel like it should be a custom subclass instead


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299335
 
 

 ##
 File path: airflow/executors/celery_executor.py
 ##
 @@ -57,17 +61,17 @@
 
 
 @app.task
-def execute_command(command_to_exec):
+def task(command_to_exec: str):
+"""Executes command."""
 log = LoggingMixin().log
 log.info("Executing command in Celery: %s", command_to_exec)
 env = os.environ.copy()
 try:
 subprocess.check_call(command_to_exec, stderr=subprocess.STDOUT,
   close_fds=True, env=env)
 except subprocess.CalledProcessError as e:
-log.exception('execute_command encountered a CalledProcessError')
+log.exception('execute command encountered a CalledProcessError')
 
 Review comment:
   ```suggestion
   log.exception('execute_command encountered a CalledProcessError')
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299726
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -35,21 +35,24 @@
 from sqlalchemy import and_, func, not_, or_
 from sqlalchemy.orm.session import make_transient
 
-from airflow import executors, models, settings
+from airflow import models, settings
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
+from airflow.executors.local_executor import LocalExecutor
+from airflow.executors.sequential_executor import SequentialExecutor
 from airflow.jobs.base_job import BaseJob
 from airflow.models import DAG, DagRun, SlaMiss, errors
+from airflow.models.taskinstance import SimpleTaskInstance
 from airflow.stats import Stats
 from airflow.ti_deps.dep_context import SCHEDULEABLE_STATES, SCHEDULED_DEPS, 
DepContext
 from airflow.ti_deps.deps.pool_slots_available_dep import 
STATES_TO_COUNT_AS_RUNNING
 from airflow.utils import asciiart, helpers, timezone
 from airflow.utils.dag_processing import (
-AbstractDagFileProcessor, DagFileProcessorAgent, SimpleDag, SimpleDagBag, 
SimpleTaskInstance,
-list_py_file_paths,
+AbstractDagFileProcessor, DagFileProcessorAgent, SimpleDag, SimpleDagBag,
 
 Review comment:
   Why did SimpleTI get moved but SimpleDag didn't?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352300093
 
 

 ##
 File path: airflow/utils/dag_processing.py
 ##
 @@ -1394,3 +1196,8 @@ def emit_metrics(self):
 # TODO: Remove before Airflow 2.0
 Stats.gauge('collect_dags', parse_time)
 Stats.gauge('dagbag_import_errors', sum(stat.import_errors for stat in 
self._file_stats.values()))
+
+# pylint: disable=missing-docstring
+@property
+def file_paths(self):
+return self._file_paths
 
 Review comment:
   API change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-30 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r352299897
 
 

 ##
 File path: airflow/models/dagbag.py
 ##
 @@ -317,9 +317,7 @@ def kill_zombies(self, zombies, session=None):
 had a heartbeat for too long, in the current DagBag.
 
 :param zombies: zombie task instances to kill.
-:type zombies: airflow.utils.dag_processing.SimpleTaskInstance
 
 Review comment:
   It sounds like we should add this back, it's for docs not the type syste 
anyway.
   
   The way to avoid imported modules/classes becoming part of the public API of 
this module is to use `__all__`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-27 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r351277914
 
 

 ##
 File path: airflow/utils/dag_processing.py
 ##
 @@ -63,204 +62,109 @@ class SimpleDag(BaseDag):
 :type pickle_id: unicode
 """
 
-def __init__(self, dag, pickle_id=None):
-self._dag_id = dag.dag_id
-self._task_ids = [task.task_id for task in dag.tasks]
-self._full_filepath = dag.full_filepath
-self._is_paused = dag.is_paused
-self._concurrency = dag.concurrency
-self._pickle_id = pickle_id
-self._task_special_args = {}
+def __init__(self, dag, pickle_id: Optional[str] = None):
+self._dag_id: str = dag.dag_id
+self._task_ids: List[str] = [task.task_id for task in dag.tasks]
+self._full_filepath: str = dag.full_filepath
+self._is_paused: bool = dag.is_paused
+self._concurrency: int = dag.concurrency
+self._pickle_id: Optional[str] = pickle_id
+self._task_special_args: Dict[str, Any] = {}
 for task in dag.tasks:
 special_args = {}
 if task.task_concurrency is not None:
 special_args['task_concurrency'] = task.task_concurrency
-if len(special_args) > 0:
+if special_args:
 self._task_special_args[task.task_id] = special_args
 
 @property
-def dag_id(self):
+def dag_id(self) -> str:
 """
 :return: the DAG ID
 :rtype: unicode
 """
 return self._dag_id
 
 @property
-def task_ids(self):
+def task_ids(self) -> List[str]:
 """
 :return: A list of task IDs that are in this DAG
 :rtype: list[unicode]
 """
 return self._task_ids
 
 @property
-def full_filepath(self):
+def full_filepath(self) -> str:
 """
 :return: The absolute path to the file that contains this DAG's 
definition
 :rtype: unicode
 """
 return self._full_filepath
 
 @property
-def concurrency(self):
+def concurrency(self) -> int:
 """
 :return: maximum number of tasks that can run simultaneously from this 
DAG
 :rtype: int
 """
 return self._concurrency
 
 @property
-def is_paused(self):
+def is_paused(self) -> bool:
 """
 :return: whether this DAG is paused or not
 :rtype: bool
 """
 return self._is_paused
 
 @property
-def pickle_id(self):
+def pickle_id(self) -> Optional[str]:
 """
 :return: The pickle ID for this DAG, if it has one. Otherwise None.
 :rtype: unicode
 """
 return self._pickle_id
 
 @property
-def task_special_args(self):
+def task_special_args(self) -> Dict[str, Any]:
+"""Special arguments of the task."""
 return self._task_special_args
 
-def get_task_special_arg(self, task_id, special_arg_name):
+def get_task_special_arg(self, task_id: str, special_arg_name: str):
+"""Retrieve special arguments of the task."""
 if task_id in self._task_special_args and special_arg_name in 
self._task_special_args[task_id]:
 return self._task_special_args[task_id][special_arg_name]
 else:
 return None
 
 
-class SimpleTaskInstance:
-def __init__(self, ti):
-self._dag_id = ti.dag_id
-self._task_id = ti.task_id
-self._execution_date = ti.execution_date
-self._start_date = ti.start_date
-self._end_date = ti.end_date
-self._try_number = ti.try_number
-self._state = ti.state
-self._executor_config = ti.executor_config
-if hasattr(ti, 'run_as_user'):
-self._run_as_user = ti.run_as_user
-else:
-self._run_as_user = None
-if hasattr(ti, 'pool'):
-self._pool = ti.pool
-else:
-self._pool = None
-if hasattr(ti, 'priority_weight'):
-self._priority_weight = ti.priority_weight
-else:
-self._priority_weight = None
-self._queue = ti.queue
-self._key = ti.key
-
-@property
-def dag_id(self):
-return self._dag_id
-
-@property
-def task_id(self):
-return self._task_id
-
-@property
-def execution_date(self):
-return self._execution_date
-
-@property
-def start_date(self):
-return self._start_date
-
-@property
-def end_date(self):
-return self._end_date
-
-@property
-def try_number(self):
-return self._try_number
-
-@property
-def state(self):
-return self._state
-
-@property
-def pool(self):
-return self._pool
-
-@property
-def priority_weight(self):
-return self._priority_weight
-
-@property
-def queue(self):
-

[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]

2019-11-25 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]
URL: https://github.com/apache/airflow/pull/6596#discussion_r350326731
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -66,7 +67,7 @@ class DagFileProcessor(AbstractDagFileProcessor, 
LoggingMixin):
 :param dag_id_white_list: If specified, only look at these DAG ID's
 :type dag_id_white_list: list[unicode]
 :param zombies: zombie task instances to kill
-:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
+:type zombies: list[airflow.executors.executor_types.SimpleTaskInstance]
 
 Review comment:
   Types for doc comments don't respect/use imports - they create links based 
on the package name only
   
   (i.e. mypy respects imports, sphinx doesn't and needs full package names to 
create a link when rendering.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]

2019-11-19 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]
URL: https://github.com/apache/airflow/pull/6596#discussion_r347861744
 
 

 ##
 File path: tests/dags/test_subdag.py
 ##
 @@ -24,7 +24,7 @@
 
 from datetime import datetime, timedelta
 
-from airflow.models import DAG
+from airflow.models.dag import DAG
 
 Review comment:
   ## Result of the current situation
   
   > Can you tell which way is better
   
   Neither :) They both work. But yes, having some internal/enforce rules for 
the airflow code base is probably a good thing for consistency.
   
   > Seems like people randomly choose the import they should use without 
knowing the consequences
   
   The only consequence is "if we change the layout it might break in the 
future". There is zero effect right now to import cycles or not.
   
   ## DAG developer's perspective
   
   > Mabe the deprecation warning should only be thrown if you are importing 
airflow from within airflow core code itself
   
   Yes if this was achievable I would be happy with this approach.
   
   Yes, the deprecation warning wouldn't _require_ a rewrite, but it would be 
annoying/noisy until the change was done - i.e. not something I want to force 
on users without a definite benefit to it.
   
   > I hope you can agree with me that it makes sense for Airflow "core" in 
airflow repository use a single, direct import (airflow.models.dag.DAG) where 
circular imports will be least likely
   
   Grudgingly, because people will look at the internals and copy that, and I 
really feel like this is leaking abstractions and `airflow.models.DAG` or 
`airflow.DAG` is what consumers of the library should be using.
   
   Part of this might be fixed by updating the docs to not build/publish docs 
for any `airflow.model.*` package (and bonus points for re-writing any 
references to just `airflow.models.Class`?) 
   
   ## Avoiding cyclic imports
   
   The avoiding cycling imports doesn't worry me, as the code either works, or 
it crashes the tests and we fix it on a case-by-base basis. I haven't ever seen 
a case where what an end user imports causes or avoids a cyclic import -- it's 
all only within airflow PRs. Is there a case I've not seen where we've got a 
broken import based on which order packages are imported in tests/user code?  
   
   ## Importing 'airflow' package first
   
   `python -c import airflow.hooks` will _always_ import `airflow/__init__.py` 
first and then load `airflow/hooks/__init__.py`. That is how python imports 
work.
   
   I'm all in favour of removing the side-effect-from-importing (I reported 
https://issues.apache.org/jira/browse/AIRFLOW-1931 a long time ago) but the 
cyclic import issue is just not a concern to me -- to my knowledge import 
cycles can only be created when _changing_ the airflow code (i.e. writing a PR) 
if you aren't changing the code then it is impossible to get a cycle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]

2019-11-19 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]
URL: https://github.com/apache/airflow/pull/6596#discussion_r347861744
 
 

 ##
 File path: tests/dags/test_subdag.py
 ##
 @@ -24,7 +24,7 @@
 
 from datetime import datetime, timedelta
 
-from airflow.models import DAG
+from airflow.models.dag import DAG
 
 Review comment:
   ## Result of the current situation
   
   > Can you tell which way is better
   
   Neither :) They both work. But yes, having some internal/enforce rules for 
the airflow code base is probably a good thing for consistency.
   
   > Seems like people randomly choose the import they should use without 
knowing the consequences
   
   The only consequence is "if we change the layout it might break in the 
future". There is zero effect right now to import cycles or not.
   
   ## DAG developer's perspective
   
   > Mabe the deprecation warning should only be thrown if you are importing 
airflow from within airflow core code itself
   
   Yes if this was achievable I would be happy with this approach.
   
   Yes, the deprecation warning wouldn't _require_ a rewrite, but it would be 
annoying/noisy until the change was done - i.e. not something I want to force 
on users without a definite benefit to it.
   
   > I hope you can agree with me that it makes sense for Airflow "core" in 
airflow repository use a single, direct import (airflow.models.dag.DAG) where 
circular imports will be least likely
   
   Grudgingly, because people will look at the internals and copy that, and I 
really feel like this is leaking abstractions and `airflow.models.DAG` or 
`airflow.DAG` is what consumers of the library should be using.
   
   Part of this might be fixed by updating the docs to not build/publish docs 
for any `airflow.model.*` package (and bonus points for re-writing any 
references to just `airflow.models.Class`?) 
   
   ## Avoiding cyclic imports
   
   The avoiding cycling imports doesn't worry me, as the code either works, or 
it crashes the tests and we fix it on a case-by-base basis. I haven't ever seen 
a case where what an end user imports causes or avoids a cyclic import -- it's 
all only within airflow PRs. Is there a case I've not seen where we've got a 
broken import based on which order packages are imported in tests/user code?  
   
   ## Importing 'airflow' package first
   
   `python -c import airflow.hooks` will _always_ import `airflow/__init__.py` 
first and then load `airflow/hooks/__init__.py`. That is how python imports 
work.
   
   I'm all in favour of removing the side-effect-from-importing (I reported 
https://issues.apache.org/jira/browse/AIRFLOW-1931 a long time ago) but the 
cyclic import issue is just not a concern to me -- 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]

2019-11-19 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]
URL: https://github.com/apache/airflow/pull/6596#discussion_r347861744
 
 

 ##
 File path: tests/dags/test_subdag.py
 ##
 @@ -24,7 +24,7 @@
 
 from datetime import datetime, timedelta
 
-from airflow.models import DAG
+from airflow.models.dag import DAG
 
 Review comment:
   ## Result of the current situation
   
   > Can you tell which way is better
   
   Neither :) They both work. But yes, having some internal/enforce rules for 
the airflow code base is probably a good thing for consistency.
   
   ## DAG developer's perspective
   
   > Mabe the deprecation warning should only be thrown if you are importing 
airflow from within airflow core code itself
   
   Yes if this was achievable I would be happy with this approach.
   
   Yes, the deprecation warning wouldn't _require_ a rewrite, but it would be 
annoying/noisy until the change was done - i.e. not something I want to force 
on users without a definite benefit to it.
   
   > I hope you can agree with me that it makes sense for Airflow "core" in 
airflow repository use a single, direct import (airflow.models.dag.DAG) where 
circular imports will be least likely
   
   Grudgingly, because people will look at the internals and copy that, and I 
really feel like this is leaking abstractions and `airflow.models.DAG` or 
`airflow.DAG` is what consumers of the library should be using.
   
   Part of this might be fixed by updating the docs to not build/publish docs 
for any `airflow.model.*` package (and bonus points for re-writing any 
references to just `airflow.models.Class`?) 
   
   ## Avoiding cyclic imports
   
   The avoiding cycling imports doesn't worry me, as the code either works, or 
it crashes the tests and we fix it on a case-by-base basis. I haven't ever seen 
a case where what an end user imports causes or avoids a cyclic import -- it's 
all only within airflow PRs. Is there a case I've not seen where we've got a 
broken import based on which order packages are imported in tests/user code?  
   
   ## Importing 'airflow' package first
   
   `python -c import airflow.hooks` will _always_ import `airflow/__init__.py` 
first and then load `airflow/hooks/__init__.py`. That is how python imports 
work.
   
   I'm all in favour of removing the side-effect-from-importing (I reported 
https://issues.apache.org/jira/browse/AIRFLOW-1931 a long time ago) but the 
cyclic import issue is just not a concern to me -- 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]

2019-11-19 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]
URL: https://github.com/apache/airflow/pull/6596#discussion_r347861744
 
 

 ##
 File path: tests/dags/test_subdag.py
 ##
 @@ -24,7 +24,7 @@
 
 from datetime import datetime, timedelta
 
-from airflow.models import DAG
+from airflow.models.dag import DAG
 
 Review comment:
   ## Result of the current situation
   
   > Can you tell which way is better
   
   Neither :) They both work. But yes, having some internal/enforce rules for 
the airflow code base is probably a good thing for consistency.
   
   ## DAG developer's perspective
   
   > Mabe the deprecation warning should only be thrown if you are importing 
airflow from within airflow core code itself
   
   Yes if this was achievable I would be happy with this approach.
   
   Yes, the deprecation warning wouldn't _require_ a rewrite, but it would be 
annoying/noisy until the change was done - i.e. not something I want to force 
on users without a definite benefit to it.
   
   > I hope you can agree with me that it makes sense for Airflow "core" in 
airflow repository use a single, direct import (airflow.models.dag.DAG) where 
circular imports will be least likely
   
   Grudgingly, because people will look at the internals and copy that, and I 
really feel like this is leaking abstractions and `airflow.models.DAG` or 
`airflow.DAG` is what consumers of the library should be using.
   
   ## Avoiding cyclic imports
   
   The avoiding cycling imports doesn't worry me, as the code either works, or 
it crashes the tests and we fix it on a case-by-base basis. I haven't ever seen 
a case where what an end user imports causes or avoids a cyclic import -- it's 
all only within airflow PRs. Is there a case I've not seen where we've got a 
broken import based on which order packages are imported in tests/user code?  
   
   ## Importing 'airflow' package first
   
   `python -c import airflow.hooks` will _always_ import `airflow/__init__.py` 
first and then load `airflow/hooks/__init__.py`. That is how python imports 
work.
   
   I'm all in favour of removing the side-effect-from-importing (I reported 
https://issues.apache.org/jira/browse/AIRFLOW-1931 a long time ago) but the 
cyclic import issue is just not a concern to me -- 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-18 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r347486502
 
 

 ##
 File path: tests/dags/test_subdag.py
 ##
 @@ -24,7 +24,7 @@
 
 from datetime import datetime, timedelta
 
-from airflow.models import DAG
+from airflow.models.dag import DAG
 
 Review comment:
   > sometimes you do 'from airflow import DAG' sometimes 'from airflow.models 
import DAG' and sometimes 'from airflow.models.dag import DAG
   
   From a end user point of view I don't want them to have to know which models 
are in which sub-package as it is a implementation detail that they shouldn't 
need to care about. (I mostly don't like the one-class-per-package style so 
want to avoid that where possible. Pet peave of mine I know)
   
   > I am going to deprecate 'from airflow import DAG'
   
   I'm not a fan of this as it would require almost every single DAG that 
exists right now to be changed.
   
   We can manage the cyclic imports by explicitly moving some imports to the 
end of `airflow/__init__.py` 
   
   > An average (or even experienced) developer [user] of Airflow has no idea 
about those consequences of different ways of importing such classes from 
different packages.
   
   Once `import airflow` has happened then all the packages are loaded an 
import order doesn't matter, so we can handle this all "internally" (which we 
already do via our unit tests). If it's working right now, then no mater what 
order it is imported by the user no import cycles are possible is my 
understanding.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-18 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r347422690
 
 

 ##
 File path: airflow/executors/dask_executor.py
 ##
 @@ -44,6 +42,7 @@ def __init__(self, cluster_address=None):
 super().__init__(parallelism=0)
 
 def start(self):
+import distributed
 
 Review comment:
   Yeah delaying the import of `*Executor` until needed def makes sense to stay.
   
   But by delaying the use of this import possibly delays errors due to missing 
packages for the current/selected executor untli task execution, instead of "at 
start up"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-18 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r347282260
 
 

 ##
 File path: airflow/executors/__init__.py
 ##
 @@ -57,31 +56,25 @@ def get_default_executor():
 return DEFAULT_EXECUTOR
 
 
-class Executors:
 
 Review comment:
   I'm not immediately seeing why we need to pull this out of this file -- I 
think it was okay as it was?
   
   (I'm not a huge fan of one-file-per class, espeicaly for small classes)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-18 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r347286614
 
 

 ##
 File path: airflow/dag_context_manager/__init__.py
 ##
 @@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""DAG context manager used for convenient task creation."""
+# Used by DAG context_managers
+from typing import Optional
+
+from airflow.models.dag import DAG
+
+CONTEXT_MANAGER_DAG = None  # type: Optional[DAG]
 
 Review comment:
   Or even a class property on `class Dag` itself.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-18 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r347285095
 
 

 ##
 File path: tests/dags/test_subdag.py
 ##
 @@ -24,7 +24,7 @@
 
 from datetime import datetime, timedelta
 
-from airflow.models import DAG
+from airflow.models.dag import DAG
 
 Review comment:
   Shouldn't be needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-18 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r347283883
 
 

 ##
 File path: airflow/dag_context_manager/__init__.py
 ##
 @@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""DAG context manager used for convenient task creation."""
+# Used by DAG context_managers
+from typing import Optional
+
+from airflow.models.dag import DAG
+
+CONTEXT_MANAGER_DAG = None  # type: Optional[DAG]
 
 Review comment:
   Rather than it's own file, might this make more sense in 
`airflow/models/dag.py` - it's very closely tied to `class DAG`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports

2019-11-18 Thread GitBox
ashb commented on a change in pull request #6596: [AIRFLOW-6004] Untangle 
Executors class to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r347286274
 
 

 ##
 File path: airflow/executors/dask_executor.py
 ##
 @@ -44,6 +42,7 @@ def __init__(self, cluster_address=None):
 super().__init__(parallelism=0)
 
 def start(self):
+import distributed
 
 Review comment:
   The changes to the airflow/executors/* shouldn't be needed now, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services