[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-28 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1278072688


##
airflow/cli/cli_parser.py:
##
@@ -41,10 +42,23 @@
 core_commands,
 )
 from airflow.exceptions import AirflowException
+from airflow.executors.executor_loader import ExecutorLoader
 from airflow.utils.helpers import partition
 
 airflow_commands = core_commands
 
+log = logging.getLogger(__name__)
+try:
+executor, _ = ExecutorLoader.import_default_executor_cls(validate=False)
+airflow_commands.extend(executor.get_cli_commands())
+except Exception:
+executor_name = ExecutorLoader.get_default_executor_name()
+log.exception("Failed to load CLI commands from executor: %s", 
executor_name)
+log.error("Ensure all dependencies are met and try again")

Review Comment:
   Here is a screenshot of how it will look. Some things to note:
   
   1. I start by uninstalling a dependency of Celery
   1. Notice how if the executor is something like LocalExecutor, we get no 
warnings and everything works just fine :smiley: 
   1. If we switch to CeleryExecutor, we get:
  1. A message saying we failed to load CLI commands from executor 

  1. The traceback and Exception message
  1. And the helpful tip (now updated with more details from your 
suggestion @potiuk)
   1. The CLI command still works despite the issue 
 
   Let me know if you think this suffices
   ![Screenshot from 2023-07-28 
13-46-38](https://github.com/apache/airflow/assets/65743084/35b6a04d-257f-4e65-bbea-f528935504d7)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-28 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1278063529


##
airflow/cli/cli_parser.py:
##
@@ -41,10 +42,23 @@
 core_commands,
 )
 from airflow.exceptions import AirflowException
+from airflow.executors.executor_loader import ExecutorLoader
 from airflow.utils.helpers import partition
 
 airflow_commands = core_commands
 
+log = logging.getLogger(__name__)
+try:
+executor, _ = ExecutorLoader.import_default_executor_cls(validate=False)
+airflow_commands.extend(executor.get_cli_commands())
+except Exception:
+executor_name = ExecutorLoader.get_default_executor_name()
+log.exception("Failed to load CLI commands from executor: %s", 
executor_name)
+log.error("Ensure all dependencies are met and try again")

Review Comment:
   Ah, actually, after reviewing this I'm realizing I actually already solved 
this by using `log.exception`. That includes the exception message along with 
the log message. So I think we're all good on that front.
   
   But I'll add a bit more info to the following message I display to have some 
info about celery/kubernetes
   @potiuk 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-28 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1278037043


##
tests/providers/celery/executors/test_celery_kubernetes_executor.py:
##
@@ -49,6 +49,9 @@ def test_serve_logs_default_value(self):
 def test_is_single_threaded_default_value(self):
 assert not CeleryKubernetesExecutor.is_single_threaded
 
+def test_no_cli_commands_vended(self):

Review Comment:
   Yupp, the combination executors used to not vend commands, but I changed it 
to vend after a comment from Jed. I'll update the name here :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-28 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1278030846


##
airflow/cli/cli_parser.py:
##
@@ -41,10 +42,23 @@
 core_commands,
 )
 from airflow.exceptions import AirflowException
+from airflow.executors.executor_loader import ExecutorLoader
 from airflow.utils.helpers import partition
 
 airflow_commands = core_commands
 
+log = logging.getLogger(__name__)
+try:
+executor, _ = ExecutorLoader.import_default_executor_cls(validate=False)
+airflow_commands.extend(executor.get_cli_commands())
+except Exception:
+executor_name = ExecutorLoader.get_default_executor_name()
+log.exception("Failed to load CLI commands from executor: %s", 
executor_name)
+log.error("Ensure all dependencies are met and try again")

Review Comment:
   > I think it would be good to at least print the mesage of the exception 
that caused it.
   
   Yeah, that's fair, and I did try it with this configuration (as well as 
printing the full stack trace) but it really makes the output of the CLI 
commands ugly, and if the user doesn't even care about executor commands 
they're going to see the exception and a large warning every time which is a 
bad user experience as well.
   
   But I can try add it back in again and see how it looks.
   
   > We can add a generic advice: "If your executor is based on Celery - you 
need to install 3.3.0+ version of provider, if it is Kubernetes - yoy need to 
install 7.4.0+ of cncf.kubernetes. While we cannot be sure if this is the 
reason, this will be it in vast majority of cases and might significantly 
incrase the number of cases where the users will be able to install provider 
and solve the problem on their own rather than opening an issue to Airflow.
   
   It feels a bit wrong to me to add this much celery/kubernetes specific 
hardcoding into this generic executor code. I'll try update the general 
exception message I print here to mention the celery/kube versions, but I think 
we should remove those soon in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-28 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1277926394


##
tests/cli/test_cli_parser.py:
##
@@ -203,31 +204,22 @@ def test_positive_int(self):
 cli_config.positive_int(allow_zero=True)("-1")
 
 @pytest.mark.parametrize(
-"command",
+"executor",
 [
-["celery"],
-["celery", "--help"],
-["celery", "worker", "--help"],
-["celery", "worker"],
-["celery", "flower", "--help"],
-["celery", "flower"],
-["celery", "stop_worker", "--help"],
-["celery", "stop_worker"],
+"celery",

Review Comment:
   Simplify this test, no need to test every sub command, also add kubernetes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-28 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1277925673


##
tests/cli/conftest.py:
##
@@ -34,6 +34,18 @@
 custom_executor_module.CustomCeleryKubernetesExecutor = type(  # type: ignore
 "CustomCeleryKubernetesExecutor", 
(celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
 )
+custom_executor_module.CustomCeleryExecutor = type(  # type:  ignore

Review Comment:
   Add some more fake executors to test with below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-28 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1277925303


##
airflow/stats.py:
##
@@ -22,7 +22,6 @@
 from typing import TYPE_CHECKING, Callable
 
 from airflow.configuration import conf
-from airflow.metrics import datadog_logger, otel_logger, statsd_logger

Review Comment:
   These are incredibly expensive imports, especially otel, which were slowing 
down the CLI parser. Import them below only where they are needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-28 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1277924324


##
airflow/cli/cli_parser.py:
##
@@ -41,10 +42,23 @@
 core_commands,
 )
 from airflow.exceptions import AirflowException
+from airflow.executors.executor_loader import ExecutorLoader
 from airflow.utils.helpers import partition
 
 airflow_commands = core_commands
 
+log = logging.getLogger(__name__)
+try:
+executor, _ = ExecutorLoader.import_default_executor_cls(validate=False)
+airflow_commands.extend(executor.get_cli_commands())
+except Exception:

Review Comment:
   Under no circumstances should the executor commands fail the parsing 
process. So except any Exception here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-28 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1277923794


##
airflow/cli/cli_config.py:
##
@@ -61,46 +58,6 @@ class DefaultHelpParser(argparse.ArgumentParser):
 
 def _check_value(self, action, value):
 """Override _check_value and check conditionally added command."""
-if action.dest == "subcommand" and value == "celery":

Review Comment:
   All of this was an attempt to handle the situation of using CLI commands 
with missing deps. Now only the configured executor commands are available and 
we assume that the dependencies for that are installed (otherwise the user has 
much bigger issues to worry about).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-20 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1269833340


##
airflow/executors/celery_kubernetes_executor.py:
##
@@ -231,3 +231,7 @@ def send_callback(self, request: CallbackRequest) -> None:
 if not self.callback_sink:
 raise ValueError("Callback sink is not ready.")
 self.callback_sink.send(request)
+
+@staticmethod
+def get_cli_commands() -> list:
+return []

Review Comment:
   Actually, I'll include both celery and kube. I think it's safe to assume 
that dependencies for both should be available even if this code lives within 
one provider, but it does create a cross provider dependency (but I think by 
virtue of using this executor the user has accepted that).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-20 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1269832478


##
airflow/executors/executor_loader.py:
##
@@ -184,6 +192,31 @@ def validate_database_executor_compatibility(cls, 
executor: type[BaseExecutor])
 if engine.dialect.name == "sqlite":
 raise AirflowConfigException(f"error: cannot use SQLite with the 
{executor.__name__}")
 
+@classmethod
+def import_all_executor_classes(cls, validate=True) -> 
list[type[BaseExecutor]]:

Review Comment:
   Actually, I'll include both celery and kube. I think it's safe to assume 
that dependencies for both should be available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-20 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1269832478


##
airflow/executors/executor_loader.py:
##
@@ -184,6 +192,31 @@ def validate_database_executor_compatibility(cls, 
executor: type[BaseExecutor])
 if engine.dialect.name == "sqlite":
 raise AirflowConfigException(f"error: cannot use SQLite with the 
{executor.__name__}")
 
+@classmethod
+def import_all_executor_classes(cls, validate=True) -> 
list[type[BaseExecutor]]:

Review Comment:
   Actually, I'll include both celery and kube. I think it's safe to assume 
that dependencies for both should be available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-20 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1269825704


##
airflow/executors/executor_loader.py:
##
@@ -184,6 +192,31 @@ def validate_database_executor_compatibility(cls, 
executor: type[BaseExecutor])
 if engine.dialect.name == "sqlite":
 raise AirflowConfigException(f"error: cannot use SQLite with the 
{executor.__name__}")
 
+@classmethod
+def import_all_executor_classes(cls, validate=True) -> 
list[type[BaseExecutor]]:

Review Comment:
   After some further discussion with @potiuk we're only going to load and 
inject CLI commands for the currently configured executor, so I actually no 
longer need this method. I'll just scrap it altogether for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-07-20 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1269819841


##
airflow/executors/celery_kubernetes_executor.py:
##
@@ -231,3 +231,7 @@ def send_callback(self, request: CallbackRequest) -> None:
 if not self.callback_sink:
 raise ValueError("Callback sink is not ready.")
 self.callback_sink.send(request)
+
+@staticmethod
+def get_cli_commands() -> list:
+return []

Review Comment:
   This has moved to the celery provider packages since this revision was out. 
I suppose the right thing here is to expose the celery CLI commands.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-03-06 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1127136352


##
airflow/cli/cli_parser.py:
##
@@ -17,2165 +17,35 @@
 # specific language governing permissions and limitations
 # under the License.
 """Command-line interface."""
+
+
 from __future__ import annotations
 
 import argparse
-import json
-import os
-import textwrap
-from argparse import Action, ArgumentError, RawTextHelpFormatter
+from argparse import Action, RawTextHelpFormatter
 from functools import lru_cache
-from typing import Callable, Iterable, NamedTuple, Union
-
-import lazy_object_proxy
+from typing import Iterable
 
-from airflow import settings
-from airflow.cli.commands.legacy_commands import check_legacy_command
-from airflow.configuration import conf
+from airflow.cli.cli_config import (
+DAG_CLI_DICT,
+ActionCommand,
+Arg,
+CLICommand,
+DefaultHelpParser,
+GroupCommand,
+core_commands,
+)
 from airflow.exceptions import AirflowException
-from airflow.executors.executor_constants import CELERY_EXECUTOR, 
CELERY_KUBERNETES_EXECUTOR
 from airflow.executors.executor_loader import ExecutorLoader
-from airflow.utils.cli import ColorMode
 from airflow.utils.helpers import partition
-from airflow.utils.module_loading import import_string
-from airflow.utils.timezone import parse as parsedate
-
-BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
-
-
-def lazy_load_command(import_path: str) -> Callable:
-"""Create a lazy loader for command."""
-_, _, name = import_path.rpartition(".")
-
-def command(*args, **kwargs):
-func = import_string(import_path)
-return func(*args, **kwargs)
-
-command.__name__ = name
-
-return command
-
-
-class DefaultHelpParser(argparse.ArgumentParser):
-"""CustomParser to display help message."""
-
-def _check_value(self, action, value):
-"""Override _check_value and check conditionally added command."""
-if action.dest == "subcommand" and value == "celery":
-executor = conf.get("core", "EXECUTOR")
-if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
-classes = ()
-try:
-from airflow.executors.celery_executor import 
CeleryExecutor
-
-classes += (CeleryExecutor,)
-except ImportError:
-message = (
-"The celery subcommand requires that you pip install 
the celery module. "
-"To do it, run: pip install 'apache-airflow[celery]'"
-)
-raise ArgumentError(action, message)
-try:
-from airflow.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
-
-classes += (CeleryKubernetesExecutor,)
-except ImportError:
-pass
-if not issubclass(executor_cls, classes):
-message = (
-f"celery subcommand works only with CeleryExecutor, 
CeleryKubernetesExecutor and "
-f"executors derived from them, your current executor: 
{executor}, subclassed from: "
-f'{", ".join([base_cls.__qualname__ for base_cls in 
executor_cls.__bases__])}'
-)
-raise ArgumentError(action, message)
-if action.dest == "subcommand" and value == "kubernetes":
-try:
-import kubernetes.client  # noqa: F401
-except ImportError:
-message = (
-"The kubernetes subcommand requires that you pip install 
the kubernetes python client. "
-"To do it, run: pip install 
'apache-airflow[cncf.kubernetes]'"
-)
-raise ArgumentError(action, message)
-
-if action.choices is not None and value not in action.choices:
-check_legacy_command(action, value)
-
-super()._check_value(action, value)
-
-def error(self, message):
-"""Override error and use print_instead of print_usage."""
-self.print_help()
-self.exit(2, f"\n{self.prog} command error: {message}, see help 
above.\n")
-
-
-# Used in Arg to enable `None' as a distinct value from "not passed"
-_UNSET = object()
-
-
-class Arg:
-"""Class to keep information about command line argument."""
-
-def __init__(
-self,
-flags=_UNSET,
-help=_UNSET,
-action=_UNSET,
-default=_UNSET,
-nargs=_UNSET,
-type=_UNSET,
-choices=_UNSET,
-required=_UNSET,
-metavar=_UNSET,
-dest=_UNSET,
-):
-self.flags = flags
-self.kwargs = {}
-for k, v in locals().items():
-if v is _UNSET:
-continue
-if 

[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-01-24 Thread via GitHub


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1085901682


##
airflow/cli/cli_parser.py:
##
@@ -17,2165 +17,35 @@
 # specific language governing permissions and limitations
 # under the License.
 """Command-line interface."""
+
+
 from __future__ import annotations
 
 import argparse
-import json
-import os
-import textwrap
-from argparse import Action, ArgumentError, RawTextHelpFormatter
+from argparse import Action, RawTextHelpFormatter
 from functools import lru_cache
-from typing import Callable, Iterable, NamedTuple, Union
-
-import lazy_object_proxy
+from typing import Iterable
 
-from airflow import settings
-from airflow.cli.commands.legacy_commands import check_legacy_command
-from airflow.configuration import conf
+from airflow.cli.cli_config import (
+DAG_CLI_DICT,
+ActionCommand,
+Arg,
+CLICommand,
+DefaultHelpParser,
+GroupCommand,
+core_commands,
+)
 from airflow.exceptions import AirflowException
-from airflow.executors.executor_constants import CELERY_EXECUTOR, 
CELERY_KUBERNETES_EXECUTOR
 from airflow.executors.executor_loader import ExecutorLoader
-from airflow.utils.cli import ColorMode
 from airflow.utils.helpers import partition
-from airflow.utils.module_loading import import_string
-from airflow.utils.timezone import parse as parsedate
-
-BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
-
-
-def lazy_load_command(import_path: str) -> Callable:
-"""Create a lazy loader for command."""
-_, _, name = import_path.rpartition(".")
-
-def command(*args, **kwargs):
-func = import_string(import_path)
-return func(*args, **kwargs)
-
-command.__name__ = name
-
-return command
-
-
-class DefaultHelpParser(argparse.ArgumentParser):
-"""CustomParser to display help message."""
-
-def _check_value(self, action, value):
-"""Override _check_value and check conditionally added command."""
-if action.dest == "subcommand" and value == "celery":
-executor = conf.get("core", "EXECUTOR")
-if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
-classes = ()
-try:
-from airflow.executors.celery_executor import 
CeleryExecutor
-
-classes += (CeleryExecutor,)
-except ImportError:
-message = (
-"The celery subcommand requires that you pip install 
the celery module. "
-"To do it, run: pip install 'apache-airflow[celery]'"
-)
-raise ArgumentError(action, message)
-try:
-from airflow.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
-
-classes += (CeleryKubernetesExecutor,)
-except ImportError:
-pass
-if not issubclass(executor_cls, classes):
-message = (
-f"celery subcommand works only with CeleryExecutor, 
CeleryKubernetesExecutor and "
-f"executors derived from them, your current executor: 
{executor}, subclassed from: "
-f'{", ".join([base_cls.__qualname__ for base_cls in 
executor_cls.__bases__])}'
-)
-raise ArgumentError(action, message)
-if action.dest == "subcommand" and value == "kubernetes":
-try:
-import kubernetes.client  # noqa: F401
-except ImportError:
-message = (
-"The kubernetes subcommand requires that you pip install 
the kubernetes python client. "
-"To do it, run: pip install 
'apache-airflow[cncf.kubernetes]'"
-)
-raise ArgumentError(action, message)
-
-if action.choices is not None and value not in action.choices:
-check_legacy_command(action, value)
-
-super()._check_value(action, value)
-
-def error(self, message):
-"""Override error and use print_instead of print_usage."""
-self.print_help()
-self.exit(2, f"\n{self.prog} command error: {message}, see help 
above.\n")
-
-
-# Used in Arg to enable `None' as a distinct value from "not passed"
-_UNSET = object()
-
-
-class Arg:
-"""Class to keep information about command line argument."""
-
-def __init__(
-self,
-flags=_UNSET,
-help=_UNSET,
-action=_UNSET,
-default=_UNSET,
-nargs=_UNSET,
-type=_UNSET,
-choices=_UNSET,
-required=_UNSET,
-metavar=_UNSET,
-dest=_UNSET,
-):
-self.flags = flags
-self.kwargs = {}
-for k, v in locals().items():
-if v is _UNSET:
-continue
-if 

[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands

2023-01-19 Thread GitBox


o-nikolas commented on code in PR #29055:
URL: https://github.com/apache/airflow/pull/29055#discussion_r1082091328


##
airflow/cli/cli_parser.py:
##
@@ -17,2165 +17,35 @@
 # specific language governing permissions and limitations
 # under the License.
 """Command-line interface."""
+
+
 from __future__ import annotations
 
 import argparse
-import json
-import os
-import textwrap
-from argparse import Action, ArgumentError, RawTextHelpFormatter
+from argparse import Action, RawTextHelpFormatter
 from functools import lru_cache
-from typing import Callable, Iterable, NamedTuple, Union
-
-import lazy_object_proxy
+from typing import Iterable
 
-from airflow import settings
-from airflow.cli.commands.legacy_commands import check_legacy_command
-from airflow.configuration import conf
+from airflow.cli.cli_config import (
+DAG_CLI_DICT,
+ActionCommand,
+Arg,
+CLICommand,
+DefaultHelpParser,
+GroupCommand,
+core_commands,
+)
 from airflow.exceptions import AirflowException
-from airflow.executors.executor_constants import CELERY_EXECUTOR, 
CELERY_KUBERNETES_EXECUTOR
 from airflow.executors.executor_loader import ExecutorLoader
-from airflow.utils.cli import ColorMode
 from airflow.utils.helpers import partition
-from airflow.utils.module_loading import import_string
-from airflow.utils.timezone import parse as parsedate
-
-BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
-
-
-def lazy_load_command(import_path: str) -> Callable:
-"""Create a lazy loader for command."""
-_, _, name = import_path.rpartition(".")
-
-def command(*args, **kwargs):
-func = import_string(import_path)
-return func(*args, **kwargs)
-
-command.__name__ = name
-
-return command
-
-
-class DefaultHelpParser(argparse.ArgumentParser):
-"""CustomParser to display help message."""
-
-def _check_value(self, action, value):
-"""Override _check_value and check conditionally added command."""
-if action.dest == "subcommand" and value == "celery":
-executor = conf.get("core", "EXECUTOR")
-if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
-classes = ()
-try:
-from airflow.executors.celery_executor import 
CeleryExecutor
-
-classes += (CeleryExecutor,)
-except ImportError:
-message = (
-"The celery subcommand requires that you pip install 
the celery module. "
-"To do it, run: pip install 'apache-airflow[celery]'"
-)
-raise ArgumentError(action, message)
-try:
-from airflow.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
-
-classes += (CeleryKubernetesExecutor,)
-except ImportError:
-pass
-if not issubclass(executor_cls, classes):
-message = (
-f"celery subcommand works only with CeleryExecutor, 
CeleryKubernetesExecutor and "
-f"executors derived from them, your current executor: 
{executor}, subclassed from: "
-f'{", ".join([base_cls.__qualname__ for base_cls in 
executor_cls.__bases__])}'
-)
-raise ArgumentError(action, message)
-if action.dest == "subcommand" and value == "kubernetes":
-try:
-import kubernetes.client  # noqa: F401
-except ImportError:
-message = (
-"The kubernetes subcommand requires that you pip install 
the kubernetes python client. "
-"To do it, run: pip install 
'apache-airflow[cncf.kubernetes]'"
-)
-raise ArgumentError(action, message)
-
-if action.choices is not None and value not in action.choices:
-check_legacy_command(action, value)
-
-super()._check_value(action, value)
-
-def error(self, message):
-"""Override error and use print_instead of print_usage."""
-self.print_help()
-self.exit(2, f"\n{self.prog} command error: {message}, see help 
above.\n")
-
-
-# Used in Arg to enable `None' as a distinct value from "not passed"
-_UNSET = object()
-
-
-class Arg:
-"""Class to keep information about command line argument."""
-
-def __init__(
-self,
-flags=_UNSET,
-help=_UNSET,
-action=_UNSET,
-default=_UNSET,
-nargs=_UNSET,
-type=_UNSET,
-choices=_UNSET,
-required=_UNSET,
-metavar=_UNSET,
-dest=_UNSET,
-):
-self.flags = flags
-self.kwargs = {}
-for k, v in locals().items():
-if v is _UNSET:
-continue
-if