potiuk opened a new issue, #28275: URL: https://github.com/apache/airflow/issues/28275
### Body I am on a quest now to get rid of the Quarantined tests in CI and I have found a case that indicates that we might have a dangerous race in the core of Airflow executors. This happens rarely and it's next to impossible to reproduce on demand, so I decided to capture it in this issue. Also GitHub Action log will be removed in the future, so I copy the logs and describe the behaviour. The failing case is here: https://github.com/apache/airflow/actions/runs/3652820652/jobs/6171887435 This is the `tests.core.test_impersonation_tests.TestImpersonation testMethod=test_impersonation_subdag` failing - the test is about running a backfill on a subdag with impersonation and it seems that it works fine in most cases, but in some cases it looks like the same task is added twice to the executor Queue and when the task executes, the previously prepared temporary .cfg is already removed by the first execution, when the second execution attempts to start. I looked at the code and various paths, but I could not figure how this can happen. I think it would be great to have more pairs of eyes to look at it. Logs (I added my comments pointing to relevant lines): ``` ----------------------------- Captured stderr call ----------------------------- INFO [airflow.executors.executor_loader] Loaded executor: SequentialExecutor INFO [airflow.executors.sequential_executor.SequentialExecutor] Adding to queue: ['airflow', 'tasks', 'run', 'impersonation_subdag', 'test_subdag_operation', 'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmp30wvhstz'] INFO [airflow.executors.sequential_executor.SequentialExecutor] Adding to queue: ['airflow', 'tasks', 'run', 'impersonation_subdag.test_subdag_operation', 'exec_python_fn', 'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmpn88_y_ar'] INFO [airflow.executors.sequential_executor.SequentialExecutor] Adding to queue: ['airflow', 'tasks', 'run', 'impersonation_subdag.test_subdag_operation', 'exec_bash_operator', 'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmptyfwogyc'] INFO [airflow.executors.sequential_executor.SequentialExecutor] Executing command: ['airflow', 'tasks', 'run', 'impersonation_subdag', 'test_subdag_operation', 'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmp30wvhstz'] INFO [airflow.executors.sequential_executor.SequentialExecutor] Executing command: ['airflow', 'tasks', 'run', 'impersonation_subdag.test_subdag_operation', 'exec_python_fn', 'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmpn88_y_ar'] INFO [airflow.executors.sequential_executor.SequentialExecutor] Executing command: ['airflow', 'tasks', 'run', 'impersonation_subdag.test_subdag_operation', 'exec_bash_operator', 'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmptyfwogyc'] WARNI [airflow.jobs.backfill_job.BackfillJob] Task instance <TaskInstance: impersonation_subdag.test_subdag_operation backfill__2015-01-01T00:00:00+00:00 [up_for_reschedule]> is up for reschedule INFO [airflow.models.dagrun.DagRun] Marking run <DagRun impersonation_subdag.test_subdag_operation @ 2015-01-01T00:00:00+00:00: backfill__2015-01-01T00:00:00+00:00, state:running, queued_at: None. externally triggered: False> successful INFO [airflow.models.dagrun.DagRun] DagRun Finished: dag_id=impersonation_subdag.test_subdag_operation, execution_date=2015-01-01T00:00:00+00:00, run_id=backfill__2015-01-01T00:00:00+00:00, run_start_date=2022-12-08 23:35:11.461535+00:00, run_end_date=2022-12-08 23:36:08.586102+00:00, run_duration=57.124567, state=success, external_trigger=False, run_type=backfill, data_interval_start=2015-01-01T00:00:00+00:00, data_interval_end=2015-01-02T00:00:00+00:00, dag_hash=None INFO [airflow.jobs.backfill_job.BackfillJob] [backfill progress] | finished run 1 of 1 | tasks waiting: 1 | succeeded: 2 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0 INFO [airflow.executors.sequential_executor.SequentialExecutor] Adding to queue: ['airflow', 'tasks', 'run', 'impersonation_subdag', 'test_subdag_operation', 'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmp4j2_55yd'] ^^ HERE THE TASK IS ADDED TO THE QUEUE (ONCE) INFO [airflow.executors.sequential_executor.SequentialExecutor] Executing command: ['airflow', 'tasks', 'run', 'impersonation_subdag', 'test_subdag_operation', 'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmp4j2_55yd'] ^^ HERE THE TASK IS EXECUTED FOR THE FIRST TIME INFO [airflow.executors.sequential_executor.SequentialExecutor] Executing command: ['airflow', 'tasks', 'run', 'impersonation_subdag', 'test_subdag_operation', 'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmp4j2_55yd'] ^^ HERE THE TASK IS EXECUTED FOR THE SECOND (!) TIME vv AND HERE IT FAILS WITH EXCEPTION BECAUSE THE TEMPORARY CONFIG HAS BEEN DELETED BY THE FIRST EXECUTION Traceback (most recent call last): File "/usr/local/bin/airflow", line 33, in <module> sys.exit(load_entry_point('apache-airflow', 'console_scripts', 'airflow')()) File "/opt/airflow/airflow/__main__.py", line 39, in main args.func(args) File "/opt/airflow/airflow/cli/cli_parser.py", line 52, in command return func(*args, **kwargs) File "/opt/airflow/airflow/utils/cli.py", line 108, in wrapper return f(*args, **kwargs) File "/opt/airflow/airflow/cli/commands/task_command.py", line 356, in task_run with open(args.cfg_path) as conf_file: FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp4j2_55yd' ERROR [airflow.executors.sequential_executor.SequentialExecutor] Failed to execute task Command '['airflow', 'tasks', 'run', 'impersonation_subdag', 'test_subdag_operation', 'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmp4j2_55yd']' returned non-zero exit status 1.. userdel: airflow_test_user mail spool (/var/mail/airflow_test_user) not found ------------------------------ Captured log call ------------------------------- ``` I am afraid it might be a sign of potential race in Airflow's code and I think a really close look at the problem might be useful. ### Committer - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow project. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org