This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 3e30b3a025 Use celery worker CLI from Airflow package for Airflow < 2.8.0 (#38879) 3e30b3a025 is described below commit 3e30b3a02584e13fa130255b25756eaf7dfe35d3 Author: Jarek Potiuk <ja...@potiuk.com> AuthorDate: Tue Apr 9 21:44:59 2024 +0200 Use celery worker CLI from Airflow package for Airflow < 2.8.0 (#38879) Celery provider has an ambedded Airflow CLI command as of 3.6.1. When the #36794 was merged, we thought mistakenly that it will only be used in airflow 2.9.0+, so we used a feature introduced in Airflow 2.8.0 in the #34945 - but in fact the CLI command is configured by the Celery Executor which is also part of the Celery provider, so it was also used for airflow < 2.8.0 and failed due to missing import. This PR checks if Airflow version is < 2.8.0 and if so, it falls back to built-in airflow CLI command. --- airflow/providers/celery/executors/celery_executor.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/airflow/providers/celery/executors/celery_executor.py b/airflow/providers/celery/executors/celery_executor.py index 2a75be91da..0b4293cde7 100644 --- a/airflow/providers/celery/executors/celery_executor.py +++ b/airflow/providers/celery/executors/celery_executor.py @@ -30,10 +30,12 @@ import operator import time from collections import Counter from concurrent.futures import ProcessPoolExecutor +from importlib.metadata import version as importlib_version from multiprocessing import cpu_count from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple from celery import states as celery_states +from packaging.version import Version try: from airflow.cli.cli_config import ( @@ -178,11 +180,19 @@ ARG_WITHOUT_GOSSIP = Arg( action="store_true", ) +AIRFLOW_VERSION = Version(importlib_version("apache-airflow")) + +CELERY_CLI_COMMAND_PATH = ( + "airflow.providers.celery.cli.celery_command" + if AIRFLOW_VERSION >= Version("2.8.0") + else "airflow.cli.commands.celery_command" +) + CELERY_COMMANDS = ( ActionCommand( name="worker", help="Start a Celery worker node", - func=lazy_load_command("airflow.providers.celery.cli.celery_command.worker"), + func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.worker"), args=( ARG_QUEUES, ARG_CONCURRENCY, @@ -203,7 +213,7 @@ CELERY_COMMANDS = ( ActionCommand( name="flower", help="Start a Celery Flower", - func=lazy_load_command("airflow.providers.celery.cli.celery_command.flower"), + func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.flower"), args=( ARG_FLOWER_HOSTNAME, ARG_FLOWER_PORT, @@ -222,7 +232,7 @@ CELERY_COMMANDS = ( ActionCommand( name="stop", help="Stop the Celery worker gracefully", - func=lazy_load_command("airflow.providers.celery.cli.celery_command.stop_worker"), + func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.stop_worker"), args=(ARG_PID, ARG_VERBOSE), ), )