This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e30b3a025 Use celery worker CLI from Airflow package for Airflow < 
2.8.0 (#38879)
3e30b3a025 is described below

commit 3e30b3a02584e13fa130255b25756eaf7dfe35d3
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Tue Apr 9 21:44:59 2024 +0200

    Use celery worker CLI from Airflow package for Airflow < 2.8.0 (#38879)
    
    Celery provider has an ambedded Airflow CLI command as of 3.6.1. When
    the #36794 was merged, we thought mistakenly that it will only be used
    in airflow 2.9.0+, so we used a feature introduced in Airflow 2.8.0 in
    the #34945 - but in fact the CLI command is configured by the Celery
    Executor which is also part of the Celery provider, so it was also
    used for airflow < 2.8.0 and failed due to missing import.
    
    This PR checks if Airflow version is < 2.8.0 and if so, it falls back
    to built-in airflow CLI command.
---
 airflow/providers/celery/executors/celery_executor.py | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/celery/executors/celery_executor.py 
b/airflow/providers/celery/executors/celery_executor.py
index 2a75be91da..0b4293cde7 100644
--- a/airflow/providers/celery/executors/celery_executor.py
+++ b/airflow/providers/celery/executors/celery_executor.py
@@ -30,10 +30,12 @@ import operator
 import time
 from collections import Counter
 from concurrent.futures import ProcessPoolExecutor
+from importlib.metadata import version as importlib_version
 from multiprocessing import cpu_count
 from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple
 
 from celery import states as celery_states
+from packaging.version import Version
 
 try:
     from airflow.cli.cli_config import (
@@ -178,11 +180,19 @@ ARG_WITHOUT_GOSSIP = Arg(
     action="store_true",
 )
 
+AIRFLOW_VERSION = Version(importlib_version("apache-airflow"))
+
+CELERY_CLI_COMMAND_PATH = (
+    "airflow.providers.celery.cli.celery_command"
+    if AIRFLOW_VERSION >= Version("2.8.0")
+    else "airflow.cli.commands.celery_command"
+)
+
 CELERY_COMMANDS = (
     ActionCommand(
         name="worker",
         help="Start a Celery worker node",
-        
func=lazy_load_command("airflow.providers.celery.cli.celery_command.worker"),
+        func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.worker"),
         args=(
             ARG_QUEUES,
             ARG_CONCURRENCY,
@@ -203,7 +213,7 @@ CELERY_COMMANDS = (
     ActionCommand(
         name="flower",
         help="Start a Celery Flower",
-        
func=lazy_load_command("airflow.providers.celery.cli.celery_command.flower"),
+        func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.flower"),
         args=(
             ARG_FLOWER_HOSTNAME,
             ARG_FLOWER_PORT,
@@ -222,7 +232,7 @@ CELERY_COMMANDS = (
     ActionCommand(
         name="stop",
         help="Stop the Celery worker gracefully",
-        
func=lazy_load_command("airflow.providers.celery.cli.celery_command.stop_worker"),
+        func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.stop_worker"),
         args=(ARG_PID, ARG_VERBOSE),
     ),
 )

Reply via email to