Yang Pan created AIRFLOW-2058:
---------------------------------

             Summary: Scheduler uses MainThread for DAG file processing
                 Key: AIRFLOW-2058
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2058
             Project: Apache Airflow
          Issue Type: Bug
          Components: DAG
    Affects Versions: 1.9.0
         Environment: Ubuntu, Airflow 1.9, Sequential executor
            Reporter: Yang Pan


By reading the [source code 
|https://github.com/apache/incubator-airflow/blob/61ff29e578d1121ab4606fe122fb4e2db8f075b9/airflow/utils/dag_processing.py#L538]
 it appears the scheduler will process each DAG file, either a .py or .zip, 
using a new process. 
 
If I understand correctly, in theory what should happen in terms of processing 
a .zip file is that the dedicated process will add the .zip file to the 
PYTHONPATH, and load the file's module and dependency. When the DAG read is 
done, the process gets destroyed. And since the PYTHONPATH is process scoped, 
it won't pollute other processes.
 
However by printing out the threads and process id, it looks like Airflow 
scheduler can sometimes accidentally pick up the main process instead of 
creating a new one, and that's when collision happens.
 
Here is snippet of the PYTHONPATH when advanced_dag_dependency-1.zip is being 
processed. As you can see when it's executed by MainThread, it contains other 
.zip files. When it's using dedicated thread, only required .zip is added.
 
sys.path :['/root/airflow/dags/yang_subdag_2.zip', 
'/root/airflow/dags/yang_subdag_2.zip', '/root/airflow/dags/yang_subdag_1.zip', 
'/root/airflow/dags/yang_subdag_1.zip', 
'/root/airflow/dags/advanced_dag_dependency-2.zip', 
'/root/airflow/dags/advanced_dag_dependency-2.zip', 
'/root/airflow/dags/advanced_dag_dependency-1.zip', 
'/root/airflow/dags/advanced_dag_dependency-1.zip', 
'/root/airflow/dags/yang_subdag_1', '/usr/local/bin', '/usr/lib/python2.7', 
'/usr/lib/python2.7/plat-x86_64-linux-gnu', '/usr/lib/python2.7/lib-tk', 
'/usr/lib/python2.7/lib-old', '/usr/lib/python2.7/lib-dynload', 
'/usr/local/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages', 
'/usr/lib/python2.7/dist-packages/PILcompat', '/root/airflow/config', 
'/root/airflow/dags', '/root/airflow/plugins'] 
Print from MyFirstOperator in Dag 1 
process id: 5059 
thread id: <_MainThread(*MainThread*, started 140339858560768)> 
 
sys.path :[u'/root/airflow/dags/advanced_dag_dependency-1.zip', 
'/usr/local/bin', '/usr/lib/python2.7', 
'/usr/lib/python2.7/plat-x86_64-linux-gnu', '/usr/lib/python2.7/lib-tk', 
'/usr/lib/python2.7/lib-old', '/usr/lib/python2.7/lib-dynload', 
'/usr/local/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages', 
'/usr/lib/python2.7/dist-packages/PILcompat', '/root/airflow/config', 
'/root/airflow/dags', '/root/airflow/plugins'] 
Print from MyFirstOperator in Dag 1 
process id: 5076 
thread id: <_MainThread(*DagFileProcessor283*, started 140137838294784)> 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to