[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1278072688 ## airflow/cli/cli_parser.py: ## @@ -41,10 +42,23 @@ core_commands, ) from airflow.exceptions import AirflowException +from airflow.executors.executor_loader import ExecutorLoader from airflow.utils.helpers import partition airflow_commands = core_commands +log = logging.getLogger(__name__) +try: +executor, _ = ExecutorLoader.import_default_executor_cls(validate=False) +airflow_commands.extend(executor.get_cli_commands()) +except Exception: +executor_name = ExecutorLoader.get_default_executor_name() +log.exception("Failed to load CLI commands from executor: %s", executor_name) +log.error("Ensure all dependencies are met and try again") Review Comment: Here is a screenshot of how it will look. Some things to note: 1. I start by uninstalling a dependency of Celery 1. Notice how if the executor is something like LocalExecutor, we get no warnings and everything works just fine :smiley: 1. If we switch to CeleryExecutor, we get: 1. A message saying we failed to load CLI commands from executor 1. The traceback and Exception message 1. And the helpful tip (now updated with more details from your suggestion @potiuk) 1. The CLI command still works despite the issue Let me know if you think this suffices ![Screenshot from 2023-07-28 13-46-38](https://github.com/apache/airflow/assets/65743084/35b6a04d-257f-4e65-bbea-f528935504d7) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1278063529 ## airflow/cli/cli_parser.py: ## @@ -41,10 +42,23 @@ core_commands, ) from airflow.exceptions import AirflowException +from airflow.executors.executor_loader import ExecutorLoader from airflow.utils.helpers import partition airflow_commands = core_commands +log = logging.getLogger(__name__) +try: +executor, _ = ExecutorLoader.import_default_executor_cls(validate=False) +airflow_commands.extend(executor.get_cli_commands()) +except Exception: +executor_name = ExecutorLoader.get_default_executor_name() +log.exception("Failed to load CLI commands from executor: %s", executor_name) +log.error("Ensure all dependencies are met and try again") Review Comment: Ah, actually, after reviewing this I'm realizing I actually already solved this by using `log.exception`. That includes the exception message along with the log message. So I think we're all good on that front. But I'll add a bit more info to the following message I display to have some info about celery/kubernetes @potiuk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1278037043 ## tests/providers/celery/executors/test_celery_kubernetes_executor.py: ## @@ -49,6 +49,9 @@ def test_serve_logs_default_value(self): def test_is_single_threaded_default_value(self): assert not CeleryKubernetesExecutor.is_single_threaded +def test_no_cli_commands_vended(self): Review Comment: Yupp, the combination executors used to not vend commands, but I changed it to vend after a comment from Jed. I'll update the name here :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1278030846 ## airflow/cli/cli_parser.py: ## @@ -41,10 +42,23 @@ core_commands, ) from airflow.exceptions import AirflowException +from airflow.executors.executor_loader import ExecutorLoader from airflow.utils.helpers import partition airflow_commands = core_commands +log = logging.getLogger(__name__) +try: +executor, _ = ExecutorLoader.import_default_executor_cls(validate=False) +airflow_commands.extend(executor.get_cli_commands()) +except Exception: +executor_name = ExecutorLoader.get_default_executor_name() +log.exception("Failed to load CLI commands from executor: %s", executor_name) +log.error("Ensure all dependencies are met and try again") Review Comment: > I think it would be good to at least print the mesage of the exception that caused it. Yeah, that's fair, and I did try it with this configuration (as well as printing the full stack trace) but it really makes the output of the CLI commands ugly, and if the user doesn't even care about executor commands they're going to see the exception and a large warning every time which is a bad user experience as well. But I can try add it back in again and see how it looks. > We can add a generic advice: "If your executor is based on Celery - you need to install 3.3.0+ version of provider, if it is Kubernetes - yoy need to install 7.4.0+ of cncf.kubernetes. While we cannot be sure if this is the reason, this will be it in vast majority of cases and might significantly incrase the number of cases where the users will be able to install provider and solve the problem on their own rather than opening an issue to Airflow. It feels a bit wrong to me to add this much celery/kubernetes specific hardcoding into this generic executor code. I'll try update the general exception message I print here to mention the celery/kube versions, but I think we should remove those soon in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1277926394 ## tests/cli/test_cli_parser.py: ## @@ -203,31 +204,22 @@ def test_positive_int(self): cli_config.positive_int(allow_zero=True)("-1") @pytest.mark.parametrize( -"command", +"executor", [ -["celery"], -["celery", "--help"], -["celery", "worker", "--help"], -["celery", "worker"], -["celery", "flower", "--help"], -["celery", "flower"], -["celery", "stop_worker", "--help"], -["celery", "stop_worker"], +"celery", Review Comment: Simplify this test, no need to test every sub command, also add kubernetes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1277925673 ## tests/cli/conftest.py: ## @@ -34,6 +34,18 @@ custom_executor_module.CustomCeleryKubernetesExecutor = type( # type: ignore "CustomCeleryKubernetesExecutor", (celery_kubernetes_executor.CeleryKubernetesExecutor,), {} ) +custom_executor_module.CustomCeleryExecutor = type( # type: ignore Review Comment: Add some more fake executors to test with below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1277925303 ## airflow/stats.py: ## @@ -22,7 +22,6 @@ from typing import TYPE_CHECKING, Callable from airflow.configuration import conf -from airflow.metrics import datadog_logger, otel_logger, statsd_logger Review Comment: These are incredibly expensive imports, especially otel, which were slowing down the CLI parser. Import them below only where they are needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1277924324 ## airflow/cli/cli_parser.py: ## @@ -41,10 +42,23 @@ core_commands, ) from airflow.exceptions import AirflowException +from airflow.executors.executor_loader import ExecutorLoader from airflow.utils.helpers import partition airflow_commands = core_commands +log = logging.getLogger(__name__) +try: +executor, _ = ExecutorLoader.import_default_executor_cls(validate=False) +airflow_commands.extend(executor.get_cli_commands()) +except Exception: Review Comment: Under no circumstances should the executor commands fail the parsing process. So except any Exception here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1277923794 ## airflow/cli/cli_config.py: ## @@ -61,46 +58,6 @@ class DefaultHelpParser(argparse.ArgumentParser): def _check_value(self, action, value): """Override _check_value and check conditionally added command.""" -if action.dest == "subcommand" and value == "celery": Review Comment: All of this was an attempt to handle the situation of using CLI commands with missing deps. Now only the configured executor commands are available and we assume that the dependencies for that are installed (otherwise the user has much bigger issues to worry about). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1269833340 ## airflow/executors/celery_kubernetes_executor.py: ## @@ -231,3 +231,7 @@ def send_callback(self, request: CallbackRequest) -> None: if not self.callback_sink: raise ValueError("Callback sink is not ready.") self.callback_sink.send(request) + +@staticmethod +def get_cli_commands() -> list: +return [] Review Comment: Actually, I'll include both celery and kube. I think it's safe to assume that dependencies for both should be available even if this code lives within one provider, but it does create a cross provider dependency (but I think by virtue of using this executor the user has accepted that). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1269832478 ## airflow/executors/executor_loader.py: ## @@ -184,6 +192,31 @@ def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) if engine.dialect.name == "sqlite": raise AirflowConfigException(f"error: cannot use SQLite with the {executor.__name__}") +@classmethod +def import_all_executor_classes(cls, validate=True) -> list[type[BaseExecutor]]: Review Comment: Actually, I'll include both celery and kube. I think it's safe to assume that dependencies for both should be available. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1269832478 ## airflow/executors/executor_loader.py: ## @@ -184,6 +192,31 @@ def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) if engine.dialect.name == "sqlite": raise AirflowConfigException(f"error: cannot use SQLite with the {executor.__name__}") +@classmethod +def import_all_executor_classes(cls, validate=True) -> list[type[BaseExecutor]]: Review Comment: Actually, I'll include both celery and kube. I think it's safe to assume that dependencies for both should be available. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1269825704 ## airflow/executors/executor_loader.py: ## @@ -184,6 +192,31 @@ def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) if engine.dialect.name == "sqlite": raise AirflowConfigException(f"error: cannot use SQLite with the {executor.__name__}") +@classmethod +def import_all_executor_classes(cls, validate=True) -> list[type[BaseExecutor]]: Review Comment: After some further discussion with @potiuk we're only going to load and inject CLI commands for the currently configured executor, so I actually no longer need this method. I'll just scrap it altogether for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1269819841 ## airflow/executors/celery_kubernetes_executor.py: ## @@ -231,3 +231,7 @@ def send_callback(self, request: CallbackRequest) -> None: if not self.callback_sink: raise ValueError("Callback sink is not ready.") self.callback_sink.send(request) + +@staticmethod +def get_cli_commands() -> list: +return [] Review Comment: This has moved to the celery provider packages since this revision was out. I suppose the right thing here is to expose the celery CLI commands. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1127136352 ## airflow/cli/cli_parser.py: ## @@ -17,2165 +17,35 @@ # specific language governing permissions and limitations # under the License. """Command-line interface.""" + + from __future__ import annotations import argparse -import json -import os -import textwrap -from argparse import Action, ArgumentError, RawTextHelpFormatter +from argparse import Action, RawTextHelpFormatter from functools import lru_cache -from typing import Callable, Iterable, NamedTuple, Union - -import lazy_object_proxy +from typing import Iterable -from airflow import settings -from airflow.cli.commands.legacy_commands import check_legacy_command -from airflow.configuration import conf +from airflow.cli.cli_config import ( +DAG_CLI_DICT, +ActionCommand, +Arg, +CLICommand, +DefaultHelpParser, +GroupCommand, +core_commands, +) from airflow.exceptions import AirflowException -from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR from airflow.executors.executor_loader import ExecutorLoader -from airflow.utils.cli import ColorMode from airflow.utils.helpers import partition -from airflow.utils.module_loading import import_string -from airflow.utils.timezone import parse as parsedate - -BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ - - -def lazy_load_command(import_path: str) -> Callable: -"""Create a lazy loader for command.""" -_, _, name = import_path.rpartition(".") - -def command(*args, **kwargs): -func = import_string(import_path) -return func(*args, **kwargs) - -command.__name__ = name - -return command - - -class DefaultHelpParser(argparse.ArgumentParser): -"""CustomParser to display help message.""" - -def _check_value(self, action, value): -"""Override _check_value and check conditionally added command.""" -if action.dest == "subcommand" and value == "celery": -executor = conf.get("core", "EXECUTOR") -if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR): -executor_cls, _ = ExecutorLoader.import_executor_cls(executor) -classes = () -try: -from airflow.executors.celery_executor import CeleryExecutor - -classes += (CeleryExecutor,) -except ImportError: -message = ( -"The celery subcommand requires that you pip install the celery module. " -"To do it, run: pip install 'apache-airflow[celery]'" -) -raise ArgumentError(action, message) -try: -from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor - -classes += (CeleryKubernetesExecutor,) -except ImportError: -pass -if not issubclass(executor_cls, classes): -message = ( -f"celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor and " -f"executors derived from them, your current executor: {executor}, subclassed from: " -f'{", ".join([base_cls.__qualname__ for base_cls in executor_cls.__bases__])}' -) -raise ArgumentError(action, message) -if action.dest == "subcommand" and value == "kubernetes": -try: -import kubernetes.client # noqa: F401 -except ImportError: -message = ( -"The kubernetes subcommand requires that you pip install the kubernetes python client. " -"To do it, run: pip install 'apache-airflow[cncf.kubernetes]'" -) -raise ArgumentError(action, message) - -if action.choices is not None and value not in action.choices: -check_legacy_command(action, value) - -super()._check_value(action, value) - -def error(self, message): -"""Override error and use print_instead of print_usage.""" -self.print_help() -self.exit(2, f"\n{self.prog} command error: {message}, see help above.\n") - - -# Used in Arg to enable `None' as a distinct value from "not passed" -_UNSET = object() - - -class Arg: -"""Class to keep information about command line argument.""" - -def __init__( -self, -flags=_UNSET, -help=_UNSET, -action=_UNSET, -default=_UNSET, -nargs=_UNSET, -type=_UNSET, -choices=_UNSET, -required=_UNSET, -metavar=_UNSET, -dest=_UNSET, -): -self.flags = flags -self.kwargs = {} -for k, v in locals().items(): -if v is _UNSET: -continue -if
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1085901682 ## airflow/cli/cli_parser.py: ## @@ -17,2165 +17,35 @@ # specific language governing permissions and limitations # under the License. """Command-line interface.""" + + from __future__ import annotations import argparse -import json -import os -import textwrap -from argparse import Action, ArgumentError, RawTextHelpFormatter +from argparse import Action, RawTextHelpFormatter from functools import lru_cache -from typing import Callable, Iterable, NamedTuple, Union - -import lazy_object_proxy +from typing import Iterable -from airflow import settings -from airflow.cli.commands.legacy_commands import check_legacy_command -from airflow.configuration import conf +from airflow.cli.cli_config import ( +DAG_CLI_DICT, +ActionCommand, +Arg, +CLICommand, +DefaultHelpParser, +GroupCommand, +core_commands, +) from airflow.exceptions import AirflowException -from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR from airflow.executors.executor_loader import ExecutorLoader -from airflow.utils.cli import ColorMode from airflow.utils.helpers import partition -from airflow.utils.module_loading import import_string -from airflow.utils.timezone import parse as parsedate - -BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ - - -def lazy_load_command(import_path: str) -> Callable: -"""Create a lazy loader for command.""" -_, _, name = import_path.rpartition(".") - -def command(*args, **kwargs): -func = import_string(import_path) -return func(*args, **kwargs) - -command.__name__ = name - -return command - - -class DefaultHelpParser(argparse.ArgumentParser): -"""CustomParser to display help message.""" - -def _check_value(self, action, value): -"""Override _check_value and check conditionally added command.""" -if action.dest == "subcommand" and value == "celery": -executor = conf.get("core", "EXECUTOR") -if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR): -executor_cls, _ = ExecutorLoader.import_executor_cls(executor) -classes = () -try: -from airflow.executors.celery_executor import CeleryExecutor - -classes += (CeleryExecutor,) -except ImportError: -message = ( -"The celery subcommand requires that you pip install the celery module. " -"To do it, run: pip install 'apache-airflow[celery]'" -) -raise ArgumentError(action, message) -try: -from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor - -classes += (CeleryKubernetesExecutor,) -except ImportError: -pass -if not issubclass(executor_cls, classes): -message = ( -f"celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor and " -f"executors derived from them, your current executor: {executor}, subclassed from: " -f'{", ".join([base_cls.__qualname__ for base_cls in executor_cls.__bases__])}' -) -raise ArgumentError(action, message) -if action.dest == "subcommand" and value == "kubernetes": -try: -import kubernetes.client # noqa: F401 -except ImportError: -message = ( -"The kubernetes subcommand requires that you pip install the kubernetes python client. " -"To do it, run: pip install 'apache-airflow[cncf.kubernetes]'" -) -raise ArgumentError(action, message) - -if action.choices is not None and value not in action.choices: -check_legacy_command(action, value) - -super()._check_value(action, value) - -def error(self, message): -"""Override error and use print_instead of print_usage.""" -self.print_help() -self.exit(2, f"\n{self.prog} command error: {message}, see help above.\n") - - -# Used in Arg to enable `None' as a distinct value from "not passed" -_UNSET = object() - - -class Arg: -"""Class to keep information about command line argument.""" - -def __init__( -self, -flags=_UNSET, -help=_UNSET, -action=_UNSET, -default=_UNSET, -nargs=_UNSET, -type=_UNSET, -choices=_UNSET, -required=_UNSET, -metavar=_UNSET, -dest=_UNSET, -): -self.flags = flags -self.kwargs = {} -for k, v in locals().items(): -if v is _UNSET: -continue -if
[GitHub] [airflow] o-nikolas commented on a diff in pull request #29055: [AIP-51] Executors vending CLI commands
o-nikolas commented on code in PR #29055: URL: https://github.com/apache/airflow/pull/29055#discussion_r1082091328 ## airflow/cli/cli_parser.py: ## @@ -17,2165 +17,35 @@ # specific language governing permissions and limitations # under the License. """Command-line interface.""" + + from __future__ import annotations import argparse -import json -import os -import textwrap -from argparse import Action, ArgumentError, RawTextHelpFormatter +from argparse import Action, RawTextHelpFormatter from functools import lru_cache -from typing import Callable, Iterable, NamedTuple, Union - -import lazy_object_proxy +from typing import Iterable -from airflow import settings -from airflow.cli.commands.legacy_commands import check_legacy_command -from airflow.configuration import conf +from airflow.cli.cli_config import ( +DAG_CLI_DICT, +ActionCommand, +Arg, +CLICommand, +DefaultHelpParser, +GroupCommand, +core_commands, +) from airflow.exceptions import AirflowException -from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR from airflow.executors.executor_loader import ExecutorLoader -from airflow.utils.cli import ColorMode from airflow.utils.helpers import partition -from airflow.utils.module_loading import import_string -from airflow.utils.timezone import parse as parsedate - -BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ - - -def lazy_load_command(import_path: str) -> Callable: -"""Create a lazy loader for command.""" -_, _, name = import_path.rpartition(".") - -def command(*args, **kwargs): -func = import_string(import_path) -return func(*args, **kwargs) - -command.__name__ = name - -return command - - -class DefaultHelpParser(argparse.ArgumentParser): -"""CustomParser to display help message.""" - -def _check_value(self, action, value): -"""Override _check_value and check conditionally added command.""" -if action.dest == "subcommand" and value == "celery": -executor = conf.get("core", "EXECUTOR") -if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR): -executor_cls, _ = ExecutorLoader.import_executor_cls(executor) -classes = () -try: -from airflow.executors.celery_executor import CeleryExecutor - -classes += (CeleryExecutor,) -except ImportError: -message = ( -"The celery subcommand requires that you pip install the celery module. " -"To do it, run: pip install 'apache-airflow[celery]'" -) -raise ArgumentError(action, message) -try: -from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor - -classes += (CeleryKubernetesExecutor,) -except ImportError: -pass -if not issubclass(executor_cls, classes): -message = ( -f"celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor and " -f"executors derived from them, your current executor: {executor}, subclassed from: " -f'{", ".join([base_cls.__qualname__ for base_cls in executor_cls.__bases__])}' -) -raise ArgumentError(action, message) -if action.dest == "subcommand" and value == "kubernetes": -try: -import kubernetes.client # noqa: F401 -except ImportError: -message = ( -"The kubernetes subcommand requires that you pip install the kubernetes python client. " -"To do it, run: pip install 'apache-airflow[cncf.kubernetes]'" -) -raise ArgumentError(action, message) - -if action.choices is not None and value not in action.choices: -check_legacy_command(action, value) - -super()._check_value(action, value) - -def error(self, message): -"""Override error and use print_instead of print_usage.""" -self.print_help() -self.exit(2, f"\n{self.prog} command error: {message}, see help above.\n") - - -# Used in Arg to enable `None' as a distinct value from "not passed" -_UNSET = object() - - -class Arg: -"""Class to keep information about command line argument.""" - -def __init__( -self, -flags=_UNSET, -help=_UNSET, -action=_UNSET, -default=_UNSET, -nargs=_UNSET, -type=_UNSET, -choices=_UNSET, -required=_UNSET, -metavar=_UNSET, -dest=_UNSET, -): -self.flags = flags -self.kwargs = {} -for k, v in locals().items(): -if v is _UNSET: -continue -if