This question was posted on StackOverflow here: https://stackoverflow.com/questions/45314174/how-to-dynamically-iterate-over-the-output-of-an-upstream-task-to-create-paralle
Consider the following example of a DAG where the first task, get_id_creds, extracts a list of credentials from a database. This operation tells me what users in my database I am able to run further data preprocessing on and it writes those ids to the file /tmp/ids.txt. I then scan those ids into my DAG and use them to generate a list of upload_transaction tasks that can be run in parallel. My question is: Is there a more idiomatically correct, dynamic way to do this using airflow? What I have here feels clumsy and brittle. How can I directly pass a list of valid IDs from one process to that defines the subsequent downstream processes? from datetime import datetime, timedeltaimport osimport sys from airflow.models import DAGfrom airflow.operators.python_operator import PythonOperator import ds_dependencies SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')if SCRIPT_PATH: sys.path.insert(0, SCRIPT_PATH) import dash_workerselse: print('Define DASH_PREPROC_PATH value in environmental variables') sys.exit(1) default_args = { 'start_date': datetime.now(), 'schedule_interval': None} DAG = DAG( dag_id='dash_preproc', default_args=default_args) get_id_creds = PythonOperator( task_id='get_id_creds', python_callable=dash_workers.get_id_creds, provide_context=True, dag=DAG) with open('/tmp/ids.txt', 'r') as infile: ids = infile.read().splitlines() for uid in uids: upload_transactions = PythonOperator( task_id=uid, python_callable=dash_workers.upload_transactions, op_args=[uid], dag=DAG) upload_transactions.set_downstream(get_id_creds) Thanks in advance! -- *Aaron Polhamus* *Director of Data Science * Cel (México): +52 (55) 1951-5612 Cell (USA): +1 (206) 380-3948 Tel: +52 (55) 1168 9757 - Ext. 181 -- ***Por favor referirse a nuestra página web <https://www.credijusto.com/aviso-de-privacidad/> para más información acerca de nuestras políticas de privacidad.*