Hi, I only have one add_step task but a step_sensor for each step added.
e.g. start_daily_pipeline = DummyOperator( task_id="start_daily_pipeline", dag=dag ) cluster_creator = EmrCreateJobFlowOperator( task_id="create_job_flow", aws_conn_id="aws_role_default", emr_conn_id="emr_default", job_flow_overrides=JOB_FLOW_OVERRIDES, dag=dag, ) step_adder = EmrAddStepsOperator( task_id="add_steps", job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}", aws_conn_id="aws_role_default", steps=SPARK_STEPS, dag=dag, ) step1_checker = EmrStepSensor( task_id="watch_step_1", job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}", aws_conn_id="aws_role_default", dag=dag, ) step2_checker = EmrStepSensor( task_id="watch_step_2", job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[1] }}", aws_conn_id="aws_role_default", dag=dag, ) job_flow_checker = EmrJobFlowSensor( task_id="watch_job_flow", job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}", aws_conn_id="aws_role_default", dag=dag, ) cluster_remover = EmrTerminateJobFlowOperator( task_id="remove_cluster", job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}", aws_conn_id="aws_role_default", dag=dag, ) start_daily_pipeline >> cluster_creator >> step_adder step_adder >> [step1_checker, step2_checker] >> job_flow_checker >> cluster_remover From: Avi Levi <a...@theneura.com> Sent: 03 June 2021 08:21 To: users@airflow.apache.org Subject: Create dependencies between emr steps Hi, How can I create dependencies between emr steps ? Do I need to create a step watcher between each one like below (option 1) or I don't need the step_watcher and they can be dependent directly (option 2) ? meaning something like this: step1 = EmrAddStepsOperator( task_id="step1", job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}", aws_conn_id="aws_default", steps=STEP1, dag=dag, ) step2 = EmrAddStepsOperator( task_id="step2", job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}", aws_conn_id="aws_default", steps=STEP2, dag=dag, ) step1_watcher = EmrStepSensor( task_id="step_1watcher", job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}", step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}", aws_conn_id="aws_default", dag=dag, ) step2_watcher = EmrStepSensor( task_id="step_2watcher", job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}", step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}", aws_conn_id="aws_default", dag=dag, ) option 1: start_pipeline >> create_emr_cluster >> step1 >> step1_watcher >> step2 >> step2_warcher >> terminate option 2: start_pipeline >> create_emr_cluster >> step1 >> step2 >> step2_warcher >> terminate