potiuk opened a new issue, #28275:
URL: https://github.com/apache/airflow/issues/28275

   ### Body
   
   I am on a quest now to get rid of the Quarantined tests in CI and I have 
found a case that indicates that we might have a dangerous race in the core of 
Airflow executors. This happens rarely and it's next to impossible to reproduce 
on demand, so I decided to capture it in this issue. Also GitHub Action log 
will be removed in the future, so I copy the logs and describe the behaviour.
   
   The failing case is here: 
https://github.com/apache/airflow/actions/runs/3652820652/jobs/6171887435
   
   This is the `tests.core.test_impersonation_tests.TestImpersonation 
testMethod=test_impersonation_subdag` failing - the test is about running a 
backfill on a subdag with impersonation and it seems that it works fine in most 
cases, but in some cases it looks like the same task is added twice to the 
executor Queue and when the task executes, the previously prepared temporary 
.cfg is already removed by the first execution, when the second execution 
attempts to start.
   
   I looked at the code and various paths, but I could not figure how this can 
happen. I think it would be great to have more pairs of eyes to look at it.
   
   Logs (I added my comments pointing to relevant lines):
   
   ```
    ----------------------------- Captured stderr call 
-----------------------------
     INFO  [airflow.executors.executor_loader] Loaded executor: 
SequentialExecutor
     INFO  [airflow.executors.sequential_executor.SequentialExecutor] Adding to 
queue: ['airflow', 'tasks', 'run', 'impersonation_subdag', 
'test_subdag_operation', 'backfill__2015-01-01T00:00:00+00:00', '--local', 
'--pool', 'default_pool', '--subdir', 
'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmp30wvhstz']
     INFO  [airflow.executors.sequential_executor.SequentialExecutor] Adding to 
queue: ['airflow', 'tasks', 'run', 
'impersonation_subdag.test_subdag_operation', 'exec_python_fn', 
'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', 
'--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', 
'/tmp/tmpn88_y_ar']
     INFO  [airflow.executors.sequential_executor.SequentialExecutor] Adding to 
queue: ['airflow', 'tasks', 'run', 
'impersonation_subdag.test_subdag_operation', 'exec_bash_operator', 
'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', 
'--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', 
'/tmp/tmptyfwogyc']
     INFO  [airflow.executors.sequential_executor.SequentialExecutor] Executing 
command: ['airflow', 'tasks', 'run', 'impersonation_subdag', 
'test_subdag_operation', 'backfill__2015-01-01T00:00:00+00:00', '--local', 
'--pool', 'default_pool', '--subdir', 
'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmp30wvhstz']
     INFO  [airflow.executors.sequential_executor.SequentialExecutor] Executing 
command: ['airflow', 'tasks', 'run', 
'impersonation_subdag.test_subdag_operation', 'exec_python_fn', 
'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', 
'--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', 
'/tmp/tmpn88_y_ar']
     INFO  [airflow.executors.sequential_executor.SequentialExecutor] Executing 
command: ['airflow', 'tasks', 'run', 
'impersonation_subdag.test_subdag_operation', 'exec_bash_operator', 
'backfill__2015-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', 
'--subdir', 'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', 
'/tmp/tmptyfwogyc']
     WARNI [airflow.jobs.backfill_job.BackfillJob] Task instance <TaskInstance: 
impersonation_subdag.test_subdag_operation backfill__2015-01-01T00:00:00+00:00 
[up_for_reschedule]> is up for reschedule
     INFO  [airflow.models.dagrun.DagRun] Marking run <DagRun 
impersonation_subdag.test_subdag_operation @ 2015-01-01T00:00:00+00:00: 
backfill__2015-01-01T00:00:00+00:00, state:running, queued_at: None. externally 
triggered: False> successful
     INFO  [airflow.models.dagrun.DagRun] DagRun Finished: 
dag_id=impersonation_subdag.test_subdag_operation, 
execution_date=2015-01-01T00:00:00+00:00, 
run_id=backfill__2015-01-01T00:00:00+00:00, run_start_date=2022-12-08 
23:35:11.461535+00:00, run_end_date=2022-12-08 23:36:08.586102+00:00, 
run_duration=57.124567, state=success, external_trigger=False, 
run_type=backfill, data_interval_start=2015-01-01T00:00:00+00:00, 
data_interval_end=2015-01-02T00:00:00+00:00, dag_hash=None
     INFO  [airflow.jobs.backfill_job.BackfillJob] [backfill progress] | 
finished run 1 of 1 | tasks waiting: 1 | succeeded: 2 | running: 0 | failed: 0 
| skipped: 0 | deadlocked: 0 | not ready: 0
   
     INFO  [airflow.executors.sequential_executor.SequentialExecutor] Adding to 
queue: ['airflow', 'tasks', 'run', 'impersonation_subdag', 
'test_subdag_operation', 'backfill__2015-01-01T00:00:00+00:00', '--local', 
'--pool', 'default_pool', '--subdir', 
'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmp4j2_55yd']
    
   ^^ HERE THE TASK IS ADDED TO THE QUEUE  (ONCE)
   
    INFO  [airflow.executors.sequential_executor.SequentialExecutor] Executing 
command: ['airflow', 'tasks', 'run', 'impersonation_subdag', 
'test_subdag_operation', 'backfill__2015-01-01T00:00:00+00:00', '--local', 
'--pool', 'default_pool', '--subdir', 
'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmp4j2_55yd']
   
   ^^ HERE THE TASK IS EXECUTED FOR THE FIRST TIME
   
     INFO  [airflow.executors.sequential_executor.SequentialExecutor] Executing 
command: ['airflow', 'tasks', 'run', 'impersonation_subdag', 
'test_subdag_operation', 'backfill__2015-01-01T00:00:00+00:00', '--local', 
'--pool', 'default_pool', '--subdir', 
'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmp4j2_55yd']
   
   ^^ HERE THE TASK IS EXECUTED FOR THE SECOND (!) TIME
   
   vv AND HERE IT FAILS WITH EXCEPTION BECAUSE THE TEMPORARY CONFIG HAS BEEN 
DELETED BY THE FIRST EXECUTION
   
     Traceback (most recent call last):
       File "/usr/local/bin/airflow", line 33, in <module>
         sys.exit(load_entry_point('apache-airflow', 'console_scripts', 
'airflow')())
       File "/opt/airflow/airflow/__main__.py", line 39, in main
         args.func(args)
       File "/opt/airflow/airflow/cli/cli_parser.py", line 52, in command
         return func(*args, **kwargs)
       File "/opt/airflow/airflow/utils/cli.py", line 108, in wrapper
         return f(*args, **kwargs)
       File "/opt/airflow/airflow/cli/commands/task_command.py", line 356, in 
task_run
         with open(args.cfg_path) as conf_file:
     FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp4j2_55yd'
     ERROR [airflow.executors.sequential_executor.SequentialExecutor] Failed to 
execute task Command '['airflow', 'tasks', 'run', 'impersonation_subdag', 
'test_subdag_operation', 'backfill__2015-01-01T00:00:00+00:00', '--local', 
'--pool', 'default_pool', '--subdir', 
'DAGS_FOLDER/test_impersonation_subdag.py', '--cfg-path', '/tmp/tmp4j2_55yd']' 
returned non-zero exit status 1..
     userdel: airflow_test_user mail spool (/var/mail/airflow_test_user) not 
found
     ------------------------------ Captured log call 
-------------------------------
   ```
   
   I am afraid it might be a sign of potential race in Airflow's code and I 
think a really close look at the problem might be useful.
   
   ### Committer
   
   - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to