Thanks Tom, But this way there is no dependency between the steps right ? here you are just verifying that they are both completed. However, I do want step2 to be dependent on step1 completion successfully . So do i have to go on option 1 ? meaning add step_checker after each step to verify completion option 1: start_pipeline >> create_emr_cluster >> step1 >> step1_watcher >> step2 >> step2_warcher >> terminate
option 2: start_pipeline >> create_emr_cluster >> step1 >> step2 >> step2_warcher >> terminate On Thu, Jun 3, 2021 at 5:20 AM Tom Korrison <tom.korri...@concirrus.com> wrote: > Hi, > > > > I only have one add_step task but a step_sensor for each step added. > > > > e.g. > > > > start_daily_pipeline = DummyOperator( > task_id="start_daily_pipeline", > dag=dag > ) > > cluster_creator = EmrCreateJobFlowOperator( > task_id="create_job_flow", > aws_conn_id="aws_role_default", > emr_conn_id="emr_default", > job_flow_overrides=JOB_FLOW_OVERRIDES, > dag=dag, > ) > > step_adder = EmrAddStepsOperator( > task_id="add_steps", > job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', > key='return_value') }}", > aws_conn_id="aws_role_default", > steps=SPARK_STEPS, > dag=dag, > ) > > step1_checker = EmrStepSensor( > task_id="watch_step_1", > job_flow_id="{{ task_instance.xcom_pull('create_job_flow', > key='return_value') }}", > step_id="{{ task_instance.xcom_pull(task_ids='add_steps', > key='return_value')[0] }}", > aws_conn_id="aws_role_default", > dag=dag, > ) > > step2_checker = EmrStepSensor( > task_id="watch_step_2", > job_flow_id="{{ task_instance.xcom_pull('create_job_flow', > key='return_value') }}", > step_id="{{ task_instance.xcom_pull(task_ids='add_steps', > key='return_value')[1] }}", > aws_conn_id="aws_role_default", > dag=dag, > ) > > job_flow_checker = EmrJobFlowSensor( > task_id="watch_job_flow", > job_flow_id="{{ task_instance.xcom_pull('create_job_flow', > key='return_value') }}", > step_id="{{ task_instance.xcom_pull('add_steps', > key='return_value')[0] }}", > aws_conn_id="aws_role_default", > dag=dag, > ) > > cluster_remover = EmrTerminateJobFlowOperator( > task_id="remove_cluster", > job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', > key='return_value') }}", > aws_conn_id="aws_role_default", > dag=dag, > ) > > > start_daily_pipeline >> cluster_creator >> step_adder > step_adder >> [step1_checker, step2_checker] >> job_flow_checker >> > cluster_remover > > > > > > *From:* Avi Levi <a...@theneura.com> > *Sent:* 03 June 2021 08:21 > *To:* users@airflow.apache.org > *Subject:* Create dependencies between emr steps > > > > Hi, > > How can I create dependencies between emr steps ? Do I need to create a > step watcher between each one like below (option 1) or I don't need the > step_watcher and they can be dependent directly (option 2) ? meaning > something like this: > > > > step1 = EmrAddStepsOperator( > task_id="step1", > job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', > key='return_value') }}", > aws_conn_id="aws_default", > steps=STEP1, > dag=dag, > ) > > > > step2 = EmrAddStepsOperator( > task_id="step2", > job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', > key='return_value') }}", > aws_conn_id="aws_default", > steps=STEP2, > dag=dag, > ) > > > > step1_watcher = EmrStepSensor( > task_id="step_1watcher", > job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', > key='return_value') }}", > step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] > }}", > aws_conn_id="aws_default", > dag=dag, > ) > > > > step2_watcher = EmrStepSensor( > task_id="step_2watcher", > job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', > key='return_value') }}", > step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] > }}", > aws_conn_id="aws_default", > dag=dag, > ) > > > > option 1: start_pipeline >> create_emr_cluster >> step1 >> step1_watcher > >> step2 >> step2_warcher >> terminate > > > > option 2: start_pipeline >> create_emr_cluster >> step1 >> step2 >> > step2_warcher >> terminate > > >