> > So do i have to go on option 1 ? meaning add step_checker after each step > to verify completion
One thing you can do to make this less onerous is subclass EmrAddStepsOperator and add a property that produces the sensor it needs. So you would have something like task1 = EmrAddStepsOperator(...) task1_sensor = task1.sensor_task task1 >> task1_sensor Another option you have is you can create EmrAddStepsSyncronousOperator, and do the waiting in the same task. On Thu, Jun 3, 2021 at 7:46 AM Avi Levi <a...@theneura.com> wrote: > Thanks Tom, > But this way there is no dependency between the steps right ? here you are > just verifying that they are both completed. However, I do want step2 to be > dependent on step1 completion successfully . > So do i have to go on option 1 ? meaning add step_checker after each step > to verify completion > option 1: start_pipeline >> create_emr_cluster >> step1 >> step1_watcher > >> step2 >> step2_warcher >> terminate > > option 2: start_pipeline >> create_emr_cluster >> step1 >> step2 >> > step2_warcher >> terminate > > On Thu, Jun 3, 2021 at 5:20 AM Tom Korrison <tom.korri...@concirrus.com> > wrote: > >> Hi, >> >> >> >> I only have one add_step task but a step_sensor for each step added. >> >> >> >> e.g. >> >> >> >> start_daily_pipeline = DummyOperator( >> task_id="start_daily_pipeline", >> dag=dag >> ) >> >> cluster_creator = EmrCreateJobFlowOperator( >> task_id="create_job_flow", >> aws_conn_id="aws_role_default", >> emr_conn_id="emr_default", >> job_flow_overrides=JOB_FLOW_OVERRIDES, >> dag=dag, >> ) >> >> step_adder = EmrAddStepsOperator( >> task_id="add_steps", >> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', >> key='return_value') }}", >> aws_conn_id="aws_role_default", >> steps=SPARK_STEPS, >> dag=dag, >> ) >> >> step1_checker = EmrStepSensor( >> task_id="watch_step_1", >> job_flow_id="{{ task_instance.xcom_pull('create_job_flow', >> key='return_value') }}", >> step_id="{{ task_instance.xcom_pull(task_ids='add_steps', >> key='return_value')[0] }}", >> aws_conn_id="aws_role_default", >> dag=dag, >> ) >> >> step2_checker = EmrStepSensor( >> task_id="watch_step_2", >> job_flow_id="{{ task_instance.xcom_pull('create_job_flow', >> key='return_value') }}", >> step_id="{{ task_instance.xcom_pull(task_ids='add_steps', >> key='return_value')[1] }}", >> aws_conn_id="aws_role_default", >> dag=dag, >> ) >> >> job_flow_checker = EmrJobFlowSensor( >> task_id="watch_job_flow", >> job_flow_id="{{ task_instance.xcom_pull('create_job_flow', >> key='return_value') }}", >> step_id="{{ task_instance.xcom_pull('add_steps', >> key='return_value')[0] }}", >> aws_conn_id="aws_role_default", >> dag=dag, >> ) >> >> cluster_remover = EmrTerminateJobFlowOperator( >> task_id="remove_cluster", >> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', >> key='return_value') }}", >> aws_conn_id="aws_role_default", >> dag=dag, >> ) >> >> >> start_daily_pipeline >> cluster_creator >> step_adder >> step_adder >> [step1_checker, step2_checker] >> job_flow_checker >> >> cluster_remover >> >> >> >> >> >> *From:* Avi Levi <a...@theneura.com> >> *Sent:* 03 June 2021 08:21 >> *To:* users@airflow.apache.org >> *Subject:* Create dependencies between emr steps >> >> >> >> Hi, >> >> How can I create dependencies between emr steps ? Do I need to create a >> step watcher between each one like below (option 1) or I don't need the >> step_watcher and they can be dependent directly (option 2) ? meaning >> something like this: >> >> >> >> step1 = EmrAddStepsOperator( >> task_id="step1", >> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', >> key='return_value') }}", >> aws_conn_id="aws_default", >> steps=STEP1, >> dag=dag, >> ) >> >> >> >> step2 = EmrAddStepsOperator( >> task_id="step2", >> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', >> key='return_value') }}", >> aws_conn_id="aws_default", >> steps=STEP2, >> dag=dag, >> ) >> >> >> >> step1_watcher = EmrStepSensor( >> task_id="step_1watcher", >> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', >> key='return_value') }}", >> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] >> }}", >> aws_conn_id="aws_default", >> dag=dag, >> ) >> >> >> >> step2_watcher = EmrStepSensor( >> task_id="step_2watcher", >> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', >> key='return_value') }}", >> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] >> }}", >> aws_conn_id="aws_default", >> dag=dag, >> ) >> >> >> >> option 1: start_pipeline >> create_emr_cluster >> step1 >> >> step1_watcher >> step2 >> step2_warcher >> terminate >> >> >> >> option 2: start_pipeline >> create_emr_cluster >> step1 >> step2 >> >> step2_warcher >> terminate >> >> >> >